//! Integration test for client disconnection (T051) //! //! This test verifies that pending agent requests are cleaned up when a client //! disconnects, and that other clients are unaffected. //! //! Test scenario: //! 1. Register pending requests for multiple clients //! 2. Simulate client disconnection //! 3. Verify cleanup occurs for disconnected client //! 4. Verify other clients are unaffected use dirigent_acp_api::agent_requests::AgentRequestTracker; use serde_json::json; #[tokio::test] async fn test_disconnect_single_client() { let tracker = AgentRequestTracker::new(); let client_id = "test-client"; // Register 3 pending requests let rx1 = tracker.register(client_id, json!(0)); let rx2 = tracker.register(client_id, json!(1)); let rx3 = tracker.register(client_id, json!(2)); assert_eq!(tracker.pending_count(), 3); assert_eq!(tracker.client_pending_count(client_id), 3); // Simulate client disconnection - clear all requests for this client tracker.clear(Some(client_id)); // All requests should be removed assert_eq!(tracker.pending_count(), 0); assert_eq!(tracker.client_pending_count(client_id), 0); // All receivers should get errors (channels closed) assert!(rx1.await.is_err()); assert!(rx2.await.is_err()); assert!(rx3.await.is_err()); } #[tokio::test] async fn test_disconnect_multiple_clients() { let tracker = AgentRequestTracker::new(); let client1 = "client-1"; let client2 = "client-2"; let client3 = "client-3"; // Register requests for all clients let rx1_1 = tracker.register(client1, json!(0)); let rx1_2 = tracker.register(client1, json!(1)); let rx2_1 = tracker.register(client2, json!(0)); let rx2_2 = tracker.register(client2, json!(1)); let rx2_3 = tracker.register(client2, json!(2)); let rx3_1 = tracker.register(client3, json!(0)); assert_eq!(tracker.pending_count(), 6); assert_eq!(tracker.client_pending_count(client1), 2); assert_eq!(tracker.client_pending_count(client2), 3); assert_eq!(tracker.client_pending_count(client3), 1); // Disconnect client2 tracker.clear(Some(client2)); // Only client2's requests should be removed assert_eq!(tracker.pending_count(), 3); assert_eq!(tracker.client_pending_count(client1), 2); assert_eq!(tracker.client_pending_count(client2), 0); assert_eq!(tracker.client_pending_count(client3), 1); // Client2's receivers should error assert!(rx2_1.await.is_err()); assert!(rx2_2.await.is_err()); assert!(rx2_3.await.is_err()); // Client1 and client3 should still work tracker.complete(client1, json!(0), json!({"result": "client1-0"})).unwrap(); assert_eq!(rx1_1.await.unwrap()["result"], "client1-0"); tracker.complete(client3, json!(0), json!({"result": "client3-0"})).unwrap(); assert_eq!(rx3_1.await.unwrap()["result"], "client3-0"); // Complete remaining client1 request tracker.complete(client1, json!(1), json!({"result": "client1-1"})).unwrap(); assert_eq!(rx1_2.await.unwrap()["result"], "client1-1"); // All cleaned up assert_eq!(tracker.pending_count(), 0); } #[tokio::test] async fn test_disconnect_then_reconnect() { let tracker = AgentRequestTracker::new(); let client_id = "test-client"; // First connection - register requests let rx1 = tracker.register(client_id, json!(0)); let rx2 = tracker.register(client_id, json!(1)); assert_eq!(tracker.pending_count(), 2); // Disconnect - cleanup tracker.clear(Some(client_id)); assert_eq!(tracker.pending_count(), 0); // Old receivers should error assert!(rx1.await.is_err()); assert!(rx2.await.is_err()); // Reconnect - register new requests (same client_id, same request_ids) let rx3 = tracker.register(client_id, json!(0)); let rx4 = tracker.register(client_id, json!(1)); assert_eq!(tracker.pending_count(), 2); // Complete new requests tracker.complete(client_id, json!(0), json!({"result": "new-0"})).unwrap(); tracker.complete(client_id, json!(1), json!({"result": "new-1"})).unwrap(); // New receivers should get responses assert_eq!(rx3.await.unwrap()["result"], "new-0"); assert_eq!(rx4.await.unwrap()["result"], "new-1"); assert_eq!(tracker.pending_count(), 0); } #[tokio::test] async fn test_disconnect_no_pending_requests() { let tracker = AgentRequestTracker::new(); let client_id = "test-client"; // No pending requests assert_eq!(tracker.client_pending_count(client_id), 0); // Disconnect should be no-op tracker.clear(Some(client_id)); assert_eq!(tracker.pending_count(), 0); } #[tokio::test] async fn test_clear_all_clients() { let tracker = AgentRequestTracker::new(); let client1 = "client-1"; let client2 = "client-2"; let client3 = "client-3"; // Register requests for multiple clients let rx1 = tracker.register(client1, json!(0)); let rx2 = tracker.register(client2, json!(0)); let rx3 = tracker.register(client3, json!(0)); assert_eq!(tracker.pending_count(), 3); // Clear all (simulating server shutdown) tracker.clear(None); // All should be removed assert_eq!(tracker.pending_count(), 0); assert_eq!(tracker.client_pending_count(client1), 0); assert_eq!(tracker.client_pending_count(client2), 0); assert_eq!(tracker.client_pending_count(client3), 0); // All receivers should error assert!(rx1.await.is_err()); assert!(rx2.await.is_err()); assert!(rx3.await.is_err()); } #[tokio::test] async fn test_disconnect_race_with_completion() { let tracker = AgentRequestTracker::new(); let client_id = "test-client"; // Register multiple requests let rx1 = tracker.register(client_id, json!(0)); let rx2 = tracker.register(client_id, json!(1)); let rx3 = tracker.register(client_id, json!(2)); assert_eq!(tracker.pending_count(), 3); // Complete one request tracker.complete(client_id, json!(0), json!({"result": "0"})).unwrap(); // Verify it was removed assert_eq!(tracker.pending_count(), 2); // Now disconnect (should only clear remaining requests) tracker.clear(Some(client_id)); assert_eq!(tracker.pending_count(), 0); // First receiver should have gotten response assert_eq!(rx1.await.unwrap()["result"], "0"); // Other receivers should error assert!(rx2.await.is_err()); assert!(rx3.await.is_err()); } #[tokio::test] async fn test_partial_disconnect_completion() { // Test that completing a request after disconnect fails gracefully let tracker = AgentRequestTracker::new(); let client_id = "test-client"; let _rx = tracker.register(client_id, json!(0)); assert_eq!(tracker.pending_count(), 1); // Disconnect tracker.clear(Some(client_id)); assert_eq!(tracker.pending_count(), 0); // Try to complete after disconnect - should fail let result = tracker.complete(client_id, json!(0), json!({"result": "late"})); assert!(result.is_err()); } #[tokio::test] async fn test_concurrent_disconnect_and_complete() { use tokio::task::JoinSet; let tracker = AgentRequestTracker::new(); let client_id = "test-client"; // Register many requests for i in 0..50 { tracker.register(client_id, json!(i)); } assert_eq!(tracker.pending_count(), 50); let mut join_set = JoinSet::new(); // Spawn task to disconnect after delay { let tracker_clone = tracker.clone(); join_set.spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; tracker_clone.clear(Some(client_id)); "disconnected" }); } // Spawn tasks to complete requests for i in 0..50 { let tracker_clone = tracker.clone(); join_set.spawn(async move { // Small delay to ensure some complete before disconnect tokio::time::sleep(tokio::time::Duration::from_millis((i % 5) as u64)).await; let response = json!({"request": i}); match tracker_clone.complete(client_id, json!(i), response) { Ok(_) => "completed", Err(_) => "failed", } }); } // Wait for all tasks let mut results = Vec::new(); while let Some(result) = join_set.join_next().await { results.push(result.unwrap()); } // Should have 1 disconnect + 50 completion attempts assert_eq!(results.len(), 51); // Some completions succeeded, some failed (after disconnect) let disconnects = results.iter().filter(|r| r == &&"disconnected").count(); assert_eq!(disconnects, 1); // All requests should be cleaned up assert_eq!(tracker.pending_count(), 0); }