Files
dirigent/crates/dirigent_langfuse/CLAUDE.md
T
2026-05-08 01:59:04 +02:00

2.3 KiB

Package: dirigent_langfuse

Phase 4 stream backend that mirrors BusEvents to a Langfuse ingestion endpoint.

Scope

  • LangfuseFactory registered as kind = "langfuse" in the StreamFactoryRegistry.
  • LangfuseStream implements SessionStream:
    • Maps each BusEvent via mapping::bus_event_to_items.
    • Buffers up to 32 items per flush; flushes eagerly when full and on shutdown.
    • POSTs {host}/api/public/ingestion with basic-auth (public_key, secret_key).

File map

  • src/lib.rs — public API: LangfuseStream, LangfuseConfig, LangfuseFactory.
  • src/client.rsLangfuseClient (reqwest wrapper with retry) and the LangfuseStream implementation.
  • src/mapping.rsbus_event_to_items mapping.
  • src/factory.rsStreamFactory impl.

Event → ingestion mapping

BusEvent variant Langfuse item
SessionCreated trace-create (id = scroll_id)
MessageStarted generation-create
MessageCompleted generation-update with output
SessionUpdate (non-tool) skipped
All others skipped

Events without a bound scroll_id (no late-bind hit) are dropped — the implementation does NOT buffer pending events keyed by connector_id / native_session_id in Phase 4. If buffering is needed later, extend LangfuseStream::on_event.

Failure modes

  • Transport error → StreamOutcome::Failed(StreamError::Transport). Health drift applies; the stream goes Degraded after one failure and Unavailable after five consecutive failures.
  • 5xx response → retried up to 3 times with exponential backoff (100ms → 200 → 400 → 800, capped at 1s).
  • 4xx response → returned as LangfuseError::Status(code); no retry.
  • Empty scroll_id → StreamOutcome::Skipped (not a failure).

Configuration

[[streams]]
name = "langfuse-prod"
type = "langfuse"
enabled = true
[streams.scope]
kind = "connector"
connector_uid = "01985d00-..."
[streams.params]
host       = "https://langfuse.example.com"
public_key = "pk-lf-..."
secret_key = "sk-lf-..."

Deferred

  • Tool-call → span mapping (SpanCreate/SpanUpdate): scaffolded but not yet populated.
  • Buffering pending events keyed by (connector_id, native_session_id) for late-bind scenarios.