Files
2026-05-08 01:59:04 +02:00

287 lines
8.6 KiB
Rust

//! Integration test for client disconnection (T051)
//!
//! This test verifies that pending agent requests are cleaned up when a client
//! disconnects, and that other clients are unaffected.
//!
//! Test scenario:
//! 1. Register pending requests for multiple clients
//! 2. Simulate client disconnection
//! 3. Verify cleanup occurs for disconnected client
//! 4. Verify other clients are unaffected
use dirigent_acp_api::agent_requests::AgentRequestTracker;
use serde_json::json;
#[tokio::test]
async fn test_disconnect_single_client() {
let tracker = AgentRequestTracker::new();
let client_id = "test-client";
// Register 3 pending requests
let rx1 = tracker.register(client_id, json!(0));
let rx2 = tracker.register(client_id, json!(1));
let rx3 = tracker.register(client_id, json!(2));
assert_eq!(tracker.pending_count(), 3);
assert_eq!(tracker.client_pending_count(client_id), 3);
// Simulate client disconnection - clear all requests for this client
tracker.clear(Some(client_id));
// All requests should be removed
assert_eq!(tracker.pending_count(), 0);
assert_eq!(tracker.client_pending_count(client_id), 0);
// All receivers should get errors (channels closed)
assert!(rx1.await.is_err());
assert!(rx2.await.is_err());
assert!(rx3.await.is_err());
}
#[tokio::test]
async fn test_disconnect_multiple_clients() {
let tracker = AgentRequestTracker::new();
let client1 = "client-1";
let client2 = "client-2";
let client3 = "client-3";
// Register requests for all clients
let rx1_1 = tracker.register(client1, json!(0));
let rx1_2 = tracker.register(client1, json!(1));
let rx2_1 = tracker.register(client2, json!(0));
let rx2_2 = tracker.register(client2, json!(1));
let rx2_3 = tracker.register(client2, json!(2));
let rx3_1 = tracker.register(client3, json!(0));
assert_eq!(tracker.pending_count(), 6);
assert_eq!(tracker.client_pending_count(client1), 2);
assert_eq!(tracker.client_pending_count(client2), 3);
assert_eq!(tracker.client_pending_count(client3), 1);
// Disconnect client2
tracker.clear(Some(client2));
// Only client2's requests should be removed
assert_eq!(tracker.pending_count(), 3);
assert_eq!(tracker.client_pending_count(client1), 2);
assert_eq!(tracker.client_pending_count(client2), 0);
assert_eq!(tracker.client_pending_count(client3), 1);
// Client2's receivers should error
assert!(rx2_1.await.is_err());
assert!(rx2_2.await.is_err());
assert!(rx2_3.await.is_err());
// Client1 and client3 should still work
tracker.complete(client1, json!(0), json!({"result": "client1-0"})).unwrap();
assert_eq!(rx1_1.await.unwrap()["result"], "client1-0");
tracker.complete(client3, json!(0), json!({"result": "client3-0"})).unwrap();
assert_eq!(rx3_1.await.unwrap()["result"], "client3-0");
// Complete remaining client1 request
tracker.complete(client1, json!(1), json!({"result": "client1-1"})).unwrap();
assert_eq!(rx1_2.await.unwrap()["result"], "client1-1");
// All cleaned up
assert_eq!(tracker.pending_count(), 0);
}
#[tokio::test]
async fn test_disconnect_then_reconnect() {
let tracker = AgentRequestTracker::new();
let client_id = "test-client";
// First connection - register requests
let rx1 = tracker.register(client_id, json!(0));
let rx2 = tracker.register(client_id, json!(1));
assert_eq!(tracker.pending_count(), 2);
// Disconnect - cleanup
tracker.clear(Some(client_id));
assert_eq!(tracker.pending_count(), 0);
// Old receivers should error
assert!(rx1.await.is_err());
assert!(rx2.await.is_err());
// Reconnect - register new requests (same client_id, same request_ids)
let rx3 = tracker.register(client_id, json!(0));
let rx4 = tracker.register(client_id, json!(1));
assert_eq!(tracker.pending_count(), 2);
// Complete new requests
tracker.complete(client_id, json!(0), json!({"result": "new-0"})).unwrap();
tracker.complete(client_id, json!(1), json!({"result": "new-1"})).unwrap();
// New receivers should get responses
assert_eq!(rx3.await.unwrap()["result"], "new-0");
assert_eq!(rx4.await.unwrap()["result"], "new-1");
assert_eq!(tracker.pending_count(), 0);
}
#[tokio::test]
async fn test_disconnect_no_pending_requests() {
let tracker = AgentRequestTracker::new();
let client_id = "test-client";
// No pending requests
assert_eq!(tracker.client_pending_count(client_id), 0);
// Disconnect should be no-op
tracker.clear(Some(client_id));
assert_eq!(tracker.pending_count(), 0);
}
#[tokio::test]
async fn test_clear_all_clients() {
let tracker = AgentRequestTracker::new();
let client1 = "client-1";
let client2 = "client-2";
let client3 = "client-3";
// Register requests for multiple clients
let rx1 = tracker.register(client1, json!(0));
let rx2 = tracker.register(client2, json!(0));
let rx3 = tracker.register(client3, json!(0));
assert_eq!(tracker.pending_count(), 3);
// Clear all (simulating server shutdown)
tracker.clear(None);
// All should be removed
assert_eq!(tracker.pending_count(), 0);
assert_eq!(tracker.client_pending_count(client1), 0);
assert_eq!(tracker.client_pending_count(client2), 0);
assert_eq!(tracker.client_pending_count(client3), 0);
// All receivers should error
assert!(rx1.await.is_err());
assert!(rx2.await.is_err());
assert!(rx3.await.is_err());
}
#[tokio::test]
async fn test_disconnect_race_with_completion() {
let tracker = AgentRequestTracker::new();
let client_id = "test-client";
// Register multiple requests
let rx1 = tracker.register(client_id, json!(0));
let rx2 = tracker.register(client_id, json!(1));
let rx3 = tracker.register(client_id, json!(2));
assert_eq!(tracker.pending_count(), 3);
// Complete one request
tracker.complete(client_id, json!(0), json!({"result": "0"})).unwrap();
// Verify it was removed
assert_eq!(tracker.pending_count(), 2);
// Now disconnect (should only clear remaining requests)
tracker.clear(Some(client_id));
assert_eq!(tracker.pending_count(), 0);
// First receiver should have gotten response
assert_eq!(rx1.await.unwrap()["result"], "0");
// Other receivers should error
assert!(rx2.await.is_err());
assert!(rx3.await.is_err());
}
#[tokio::test]
async fn test_partial_disconnect_completion() {
// Test that completing a request after disconnect fails gracefully
let tracker = AgentRequestTracker::new();
let client_id = "test-client";
let _rx = tracker.register(client_id, json!(0));
assert_eq!(tracker.pending_count(), 1);
// Disconnect
tracker.clear(Some(client_id));
assert_eq!(tracker.pending_count(), 0);
// Try to complete after disconnect - should fail
let result = tracker.complete(client_id, json!(0), json!({"result": "late"}));
assert!(result.is_err());
}
#[tokio::test]
async fn test_concurrent_disconnect_and_complete() {
use tokio::task::JoinSet;
let tracker = AgentRequestTracker::new();
let client_id = "test-client";
// Register many requests
for i in 0..50 {
tracker.register(client_id, json!(i));
}
assert_eq!(tracker.pending_count(), 50);
let mut join_set = JoinSet::new();
// Spawn task to disconnect after delay
{
let tracker_clone = tracker.clone();
join_set.spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
tracker_clone.clear(Some(client_id));
"disconnected"
});
}
// Spawn tasks to complete requests
for i in 0..50 {
let tracker_clone = tracker.clone();
join_set.spawn(async move {
// Small delay to ensure some complete before disconnect
tokio::time::sleep(tokio::time::Duration::from_millis((i % 5) as u64)).await;
let response = json!({"request": i});
match tracker_clone.complete(client_id, json!(i), response) {
Ok(_) => "completed",
Err(_) => "failed",
}
});
}
// Wait for all tasks
let mut results = Vec::new();
while let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
// Should have 1 disconnect + 50 completion attempts
assert_eq!(results.len(), 51);
// Some completions succeeded, some failed (after disconnect)
let disconnects = results.iter().filter(|r| r == &&"disconnected").count();
assert_eq!(disconnects, 1);
// All requests should be cleaned up
assert_eq!(tracker.pending_count(), 0);
}