diff --git a/Directory.Build.props b/Directory.Build.props index 2dae89f..6274189 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -9,7 +9,7 @@ -0.10.40 +0.10.42 diff --git a/src/RockBot.Agent/Program.cs b/src/RockBot.Agent/Program.cs index cffc511..00867a5 100644 --- a/src/RockBot.Agent/Program.cs +++ b/src/RockBot.Agent/Program.cs @@ -272,6 +272,7 @@ async Task BuildClientForTierAsync(LlmTierConfig config, string tie agent.WithFeedback(); agent.WithSkills(); agent.WithKnowledgeGraph(); + agent.WithFailureClusterStore(); agent.WithDreaming(); agent.AddToolHandler(); agent.AddMcpToolProxy(); diff --git a/src/RockBot.Host.Abstractions/ClusterKey.cs b/src/RockBot.Host.Abstractions/ClusterKey.cs new file mode 100644 index 0000000..9e29b3b --- /dev/null +++ b/src/RockBot.Host.Abstractions/ClusterKey.cs @@ -0,0 +1,28 @@ +namespace RockBot.Host; + +/// +/// Identity of a tool-failure cluster. Server and tool are normalised to +/// lowercase so case differences from MCP responses don't fragment clusters. +/// See design/self-repair.md Phase 5. +/// +/// MCP server name (lowercased on construction). +/// Tool name on that server (lowercased on construction). +/// Deterministic class of error — usually a missing field name extracted from the error string, or "unknown". +public sealed record ClusterKey(string Server, string Tool, string ErrorClass) +{ + public string Server { get; } = NormaliseLowerOrThrow(Server, nameof(Server)); + public string Tool { get; } = NormaliseLowerOrThrow(Tool, nameof(Tool)); + public string ErrorClass { get; } = ValidateOrThrow(ErrorClass, nameof(ErrorClass)); + + private static string NormaliseLowerOrThrow(string value, string paramName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(value, paramName); + return value.ToLowerInvariant(); + } + + private static string ValidateOrThrow(string value, string paramName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(value, paramName); + return value; + } +} diff --git a/src/RockBot.Host.Abstractions/FailureCluster.cs b/src/RockBot.Host.Abstractions/FailureCluster.cs new file mode 100644 index 0000000..a431d7d --- /dev/null +++ b/src/RockBot.Host.Abstractions/FailureCluster.cs @@ -0,0 +1,21 @@ +namespace RockBot.Host; + +/// +/// Aggregate state for a stream of post-recovery tool failures sharing the same +/// . Tracked in-process by +/// and persisted to the PVC so cluster history survives agent restarts. +/// See design/self-repair.md Phase 5. +/// +/// Cluster identity (server, tool, error class). +/// Total number of failures recorded for this cluster. +/// Distinct session ids that contributed at least one failure. Bounded by . +/// UTC timestamp of the first recorded failure. +/// UTC timestamp of the most recent recorded failure. +/// Most recent distinct error messages, oldest-first. Bounded by with each entry truncated. +public sealed record FailureCluster( + ClusterKey Key, + int Count, + IReadOnlySet SessionIds, + DateTimeOffset FirstSeen, + DateTimeOffset LastSeen, + IReadOnlyList SampleErrorMessages); diff --git a/src/RockBot.Host.Abstractions/FailureClusterOptions.cs b/src/RockBot.Host.Abstractions/FailureClusterOptions.cs new file mode 100644 index 0000000..7742c77 --- /dev/null +++ b/src/RockBot.Host.Abstractions/FailureClusterOptions.cs @@ -0,0 +1,59 @@ +namespace RockBot.Host; + +/// +/// Options for the failure cluster store. When is relative +/// it is resolved under , mirroring +/// . +/// +public sealed class FailureClusterOptions +{ + /// + /// Base directory for cluster state files. Defaults to "telemetry". + /// When relative, resolved under the agent profile base path + /// (/data/agent/telemetry in K8s). + /// + public string BasePath { get; set; } = "telemetry"; + + /// + /// How often the in-memory cluster state is flushed to a snapshot file and + /// the JSONL log truncated. Default 30 seconds. + /// + public TimeSpan FlushInterval { get; set; } = TimeSpan.FromSeconds(30); + + /// + /// Maximum number of sample error messages retained per cluster. Default 5. + /// Most recent messages are kept; older messages are dropped on overflow. + /// + public int MaxSampleMessages { get; set; } = 5; + + /// + /// Maximum length per sample error message (characters). Longer messages are + /// truncated with an ellipsis. Default 512. + /// + public int MaxSampleMessageLength { get; set; } = 512; + + /// + /// Maximum number of distinct session ids retained per cluster. Once this + /// cap is reached, additional sessions still increment + /// but do not grow the set. Default 64. + /// + public int MaxSessionIdsPerCluster { get; set; } = 64; + + /// + /// Minimum failure count for a cluster to be reported as escalatable. + /// Default 3 (matches the Phase 5 acceptance criterion). + /// + public int EscalationCountThreshold { get; set; } = 3; + + /// + /// Minimum number of distinct sessions for a cluster to be reported as + /// escalatable. Default 2. + /// + public int EscalationSessionThreshold { get; set; } = 2; + + /// + /// Maximum age of for a cluster to be + /// reported as escalatable. Default 24 hours. + /// + public TimeSpan EscalationWindow { get; set; } = TimeSpan.FromHours(24); +} diff --git a/src/RockBot.Host.Abstractions/IFailureClusterStore.cs b/src/RockBot.Host.Abstractions/IFailureClusterStore.cs new file mode 100644 index 0000000..6c34064 --- /dev/null +++ b/src/RockBot.Host.Abstractions/IFailureClusterStore.cs @@ -0,0 +1,46 @@ +namespace RockBot.Host; + +/// +/// Tracks tool failure clusters in-process for hot reads/writes, with PVC-backed +/// persistence for crash recovery. The MCP gateway records every post-recovery +/// failure here; the dream service reads clusters to drive repair tickets. +/// Auto-recovered calls are NOT recorded — they live in the recovery telemetry +/// metrics counter only. See design/self-repair.md Phase 5. +/// +public interface IFailureClusterStore +{ + /// + /// Increments or creates the cluster identified by , + /// adding (when non-null) to the set of sessions + /// that have produced this failure and appending + /// to the bounded sample buffer. + /// + /// Cluster identity. + /// Originating session, or null when the call was outside a session context. + /// Raw error text (truncated by the store). + /// Timestamp of the failure (typically ). + /// Cancellation token. + Task RecordAsync( + ClusterKey key, + string? sessionId, + string errorMessage, + DateTimeOffset at, + CancellationToken cancellationToken = default); + + /// + /// Returns a snapshot of every cluster currently tracked, ordered by + /// descending. + /// + Task> GetAllAsync(CancellationToken cancellationToken = default); + + /// + /// Returns the subset of clusters that meet the escalation thresholds + /// configured in — by default + /// Count >= 3 && SessionIds.Count >= 2 && (now - LastSeen) < 24h. + /// + /// The reference time used to evaluate the recency window. + /// Cancellation token. + Task> GetEscalatableAsync( + DateTimeOffset now, + CancellationToken cancellationToken = default); +} diff --git a/src/RockBot.Host/AgentMemoryExtensions.cs b/src/RockBot.Host/AgentMemoryExtensions.cs index 95494e7..50a16f0 100644 --- a/src/RockBot.Host/AgentMemoryExtensions.cs +++ b/src/RockBot.Host/AgentMemoryExtensions.cs @@ -192,6 +192,28 @@ public static AgentHostBuilder WithFeedback( return builder; } + /// + /// Registers the in-process, PVC-backed failure cluster store. Records every + /// post-recovery MCP tool failure so DreamService can spot recurring patterns + /// and open repair tickets. Opt-in — call after . + /// See design/self-repair.md Phase 5. + /// + public static AgentHostBuilder WithFailureClusterStore( + this AgentHostBuilder builder, + Action? configure = null) + { + if (configure is not null) + builder.Services.Configure(configure); + else + builder.Services.Configure(_ => { }); + + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); + + return builder; + } + /// /// Registers the file-backed knowledge graph store for entity-relationship reasoning. /// diff --git a/src/RockBot.Host/FailureErrorClassifier.cs b/src/RockBot.Host/FailureErrorClassifier.cs new file mode 100644 index 0000000..246514e --- /dev/null +++ b/src/RockBot.Host/FailureErrorClassifier.cs @@ -0,0 +1,44 @@ +using System.Text.RegularExpressions; + +namespace RockBot.Host; + +/// +/// Deterministically maps an MCP error string to an error class for +/// . Mirrors the patterns Phase 1 uses to +/// extract a missing required field name; falls back to "unknown" when +/// no pattern matches. +/// See design/self-repair.md Phase 5. +/// +internal static class FailureErrorClassifier +{ + public const string Unknown = "unknown"; + + private const RegexOptions Opts = + RegexOptions.Compiled | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant; + + private static readonly Regex[] Patterns = + [ + new(@"Required\s+parameter\s+['""]?(?[A-Za-z_][A-Za-z0-9_]*)['""]?", Opts), + new(@"['""]?(?[A-Za-z_][A-Za-z0-9_]*)['""]?\s+is\s+required\b", Opts), + new(@"missing\s+required\s+argument\s+['""]?(?[A-Za-z_][A-Za-z0-9_]*)['""]?", Opts), + new(@"expected\s+field\s+['""]?(?[A-Za-z_][A-Za-z0-9_]*)['""]?", Opts), + new(@"['""]?(?[A-Za-z_][A-Za-z0-9_]*)['""]?\s*:\s*must\s+be\s+provided", Opts), + ]; + + public static string Classify(string? errorText) + { + if (string.IsNullOrWhiteSpace(errorText)) return Unknown; + + foreach (var rx in Patterns) + { + var m = rx.Match(errorText); + if (m.Success) + { + var name = m.Groups["f"].Value; + if (!string.IsNullOrEmpty(name)) return name; + } + } + + return Unknown; + } +} diff --git a/src/RockBot.Host/FileFailureClusterStore.cs b/src/RockBot.Host/FileFailureClusterStore.cs new file mode 100644 index 0000000..4628d0b --- /dev/null +++ b/src/RockBot.Host/FileFailureClusterStore.cs @@ -0,0 +1,445 @@ +using System.Collections.Concurrent; +using System.Text; +using System.Text.Json; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace RockBot.Host; + +/// +/// In-process failure cluster store with PVC-backed persistence. Hot reads/writes +/// land in a ; durability comes from +/// an append-only JSONL log of record events plus a periodic JSON snapshot of the +/// full state. On startup the snapshot is loaded then the JSONL is replayed; on +/// snapshot completion the JSONL is truncated to bound disk growth. +/// +/// See design/self-repair.md Phase 5. +/// +internal sealed class FileFailureClusterStore : IFailureClusterStore, IHostedService, IAsyncDisposable +{ + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + PropertyNameCaseInsensitive = true + }; + + private const string SnapshotFileName = "failure-clusters.snapshot.json"; + private const string JsonlFileName = "failure-clusters.jsonl"; + + private readonly FailureClusterOptions _options; + private readonly ILogger _logger; + private readonly string _basePath; + private readonly string _snapshotPath; + private readonly string _jsonlPath; + + private readonly ConcurrentDictionary _clusters = new(); + + /// Serialises all file I/O — snapshot writes, JSONL appends, and JSONL truncation. + private readonly SemaphoreSlim _fileLock = new(1, 1); + + private Timer? _flushTimer; + private bool _loaded; + + public FileFailureClusterStore( + IOptions options, + IOptions profileOptions, + ILogger logger) + { + _options = options.Value; + _logger = logger; + _basePath = ResolvePath(_options.BasePath, profileOptions.Value.BasePath); + _snapshotPath = Path.Combine(_basePath, SnapshotFileName); + _jsonlPath = Path.Combine(_basePath, JsonlFileName); + Directory.CreateDirectory(_basePath); + _logger.LogInformation("Failure cluster store path: {Path}", _basePath); + } + + // ── IHostedService ──────────────────────────────────────────────────────── + + public async Task StartAsync(CancellationToken cancellationToken) + { + await LoadAsync(cancellationToken); + + if (_options.FlushInterval > TimeSpan.Zero) + { + _flushTimer = new Timer( + _ => _ = FlushAsync(CancellationToken.None), + state: null, + dueTime: _options.FlushInterval, + period: _options.FlushInterval); + } + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if (_flushTimer is not null) + { + await _flushTimer.DisposeAsync(); + _flushTimer = null; + } + + // CancellationToken.None — the host's stopping token may already have been + // disposed by the time other hosted services finish shutting down, and + // SemaphoreSlim.WaitAsync(disposedToken) throws ObjectDisposedException. + // Shutdown flush is critical work; let it run to completion. + await FlushAsync(CancellationToken.None); + } + + public async ValueTask DisposeAsync() + { + if (_flushTimer is not null) + { + await _flushTimer.DisposeAsync(); + _flushTimer = null; + } + _fileLock.Dispose(); + } + + // ── IFailureClusterStore ────────────────────────────────────────────────── + + public async Task RecordAsync( + ClusterKey key, + string? sessionId, + string errorMessage, + DateTimeOffset at, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(key); + var truncated = Truncate(errorMessage ?? string.Empty, _options.MaxSampleMessageLength); + + // The in-memory mutation and the JSONL append run under the same file lock + // as FlushAsync. This guarantees that any record observable in the snapshot + // (in-memory state at the time of capture) has either already been written + // to the JSONL (and is about to be truncated) or has no JSONL entry at all. + // The startup replay still skips JSONL events older than the snapshot, so + // records that landed in the snapshot are never double-applied. + await _fileLock.WaitAsync(cancellationToken); + try + { + _clusters.AddOrUpdate( + key, + _ => CreateCluster(key, sessionId, truncated, at), + (_, existing) => MergeCluster(existing, sessionId, truncated, at)); + + var line = SerializeJsonlEvent(key, sessionId, truncated, at); + await File.AppendAllTextAsync(_jsonlPath, line, cancellationToken); + } + finally + { + _fileLock.Release(); + } + } + + public Task> GetAllAsync(CancellationToken cancellationToken = default) + { + IReadOnlyList snapshot = _clusters.Values + .OrderByDescending(c => c.LastSeen) + .ToList(); + return Task.FromResult(snapshot); + } + + public Task> GetEscalatableAsync( + DateTimeOffset now, + CancellationToken cancellationToken = default) + { + var window = _options.EscalationWindow; + var minCount = _options.EscalationCountThreshold; + var minSessions = _options.EscalationSessionThreshold; + + IReadOnlyList escalatable = _clusters.Values + .Where(c => c.Count >= minCount + && c.SessionIds.Count >= minSessions + && (now - c.LastSeen) < window) + .OrderByDescending(c => c.LastSeen) + .ToList(); + return Task.FromResult(escalatable); + } + + // ── Cluster mutation helpers ────────────────────────────────────────────── + + private FailureCluster CreateCluster( + ClusterKey key, string? sessionId, string errorMessage, DateTimeOffset at) + { + var sessions = new HashSet(StringComparer.Ordinal); + if (!string.IsNullOrEmpty(sessionId)) + sessions.Add(sessionId); + + var samples = new List { errorMessage }; + + return new FailureCluster( + Key: key, + Count: 1, + SessionIds: sessions, + FirstSeen: at, + LastSeen: at, + SampleErrorMessages: samples); + } + + private FailureCluster MergeCluster( + FailureCluster existing, string? sessionId, string errorMessage, DateTimeOffset at) + { + var sessions = new HashSet(existing.SessionIds, StringComparer.Ordinal); + if (!string.IsNullOrEmpty(sessionId) + && sessions.Count < _options.MaxSessionIdsPerCluster) + { + sessions.Add(sessionId); + } + + var samples = new List(existing.SampleErrorMessages) { errorMessage }; + if (samples.Count > _options.MaxSampleMessages) + { + samples.RemoveRange(0, samples.Count - _options.MaxSampleMessages); + } + + return existing with + { + Count = existing.Count + 1, + SessionIds = sessions, + FirstSeen = existing.FirstSeen <= at ? existing.FirstSeen : at, + LastSeen = existing.LastSeen >= at ? existing.LastSeen : at, + SampleErrorMessages = samples, + }; + } + + // ── Persistence ─────────────────────────────────────────────────────────── + + private async Task LoadAsync(CancellationToken cancellationToken) + { + await _fileLock.WaitAsync(cancellationToken); + try + { + if (_loaded) return; + + DateTimeOffset snapshotWrittenAt = DateTimeOffset.MinValue; + var loaded = 0; + + if (File.Exists(_snapshotPath)) + { + try + { + var json = await File.ReadAllTextAsync(_snapshotPath, cancellationToken); + var dto = JsonSerializer.Deserialize(json, JsonOptions); + if (dto is not null) + { + snapshotWrittenAt = dto.WrittenAt; + foreach (var c in dto.Clusters ?? []) + { + var cluster = ClusterFromDto(c); + if (cluster is not null) + { + _clusters[cluster.Key] = cluster; + loaded++; + } + } + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to load failure cluster snapshot from {Path}", _snapshotPath); + } + } + + var replayed = 0; + if (File.Exists(_jsonlPath)) + { + using var stream = new FileStream( + _jsonlPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); + using var reader = new StreamReader(stream, Encoding.UTF8); + string? line; + while ((line = await reader.ReadLineAsync(cancellationToken)) is not null) + { + if (string.IsNullOrWhiteSpace(line)) continue; + + JsonlEventDto? evt = null; + try + { + evt = JsonSerializer.Deserialize(line, JsonOptions); + } + catch (JsonException ex) + { + _logger.LogWarning(ex, + "Skipping malformed failure-cluster JSONL line in {Path}", _jsonlPath); + continue; + } + + if (evt is null || evt.Key is null) continue; + if (evt.At < snapshotWrittenAt) continue; // already in snapshot + + ClusterKey key; + try + { + key = new ClusterKey(evt.Key.Server, evt.Key.Tool, evt.Key.ErrorClass); + } + catch (ArgumentException) + { + continue; + } + + _clusters.AddOrUpdate( + key, + _ => CreateCluster(key, evt.SessionId, evt.ErrorMessage ?? string.Empty, evt.At), + (_, existing) => MergeCluster(existing, evt.SessionId, evt.ErrorMessage ?? string.Empty, evt.At)); + replayed++; + } + } + + _loaded = true; + + if (loaded > 0 || replayed > 0) + { + _logger.LogInformation( + "Loaded {Loaded} cluster(s) from snapshot and replayed {Replayed} JSONL event(s)", + loaded, replayed); + } + } + finally + { + _fileLock.Release(); + } + } + + private static string SerializeJsonlEvent( + ClusterKey key, string? sessionId, string errorMessage, DateTimeOffset at) + { + var evt = new JsonlEventDto + { + At = at, + Key = new ClusterKeyDto { Server = key.Server, Tool = key.Tool, ErrorClass = key.ErrorClass }, + SessionId = sessionId, + ErrorMessage = errorMessage, + }; + + return JsonSerializer.Serialize(evt, JsonOptions) + Environment.NewLine; + } + + /// + /// Writes the in-memory cluster state to the snapshot atomically and + /// truncates the JSONL log. Safe to call concurrently — serialised by + /// . + /// + internal async Task FlushAsync(CancellationToken cancellationToken) + { + await _fileLock.WaitAsync(cancellationToken); + try + { + var snapshot = new SnapshotDto + { + WrittenAt = DateTimeOffset.UtcNow, + Clusters = _clusters.Values + .OrderByDescending(c => c.LastSeen) + .Select(ClusterToDto) + .ToList(), + }; + + var json = JsonSerializer.Serialize(snapshot, JsonOptions); + var tempPath = _snapshotPath + ".tmp"; + await File.WriteAllTextAsync(tempPath, json, cancellationToken); + File.Move(tempPath, _snapshotPath, overwrite: true); + + // Truncate JSONL — events up to snapshot.WrittenAt are now durable in the snapshot. + if (File.Exists(_jsonlPath)) + { + using var fs = new FileStream( + _jsonlPath, FileMode.Truncate, FileAccess.Write, FileShare.None); + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to flush failure cluster snapshot"); + } + finally + { + _fileLock.Release(); + } + } + + // ── DTOs and conversions ────────────────────────────────────────────────── + + private sealed class SnapshotDto + { + public DateTimeOffset WrittenAt { get; set; } + public List? Clusters { get; set; } + } + + private sealed class ClusterDto + { + public ClusterKeyDto? Key { get; set; } + public int Count { get; set; } + public List? SessionIds { get; set; } + public DateTimeOffset FirstSeen { get; set; } + public DateTimeOffset LastSeen { get; set; } + public List? SampleErrorMessages { get; set; } + } + + private sealed class ClusterKeyDto + { + public string Server { get; set; } = string.Empty; + public string Tool { get; set; } = string.Empty; + public string ErrorClass { get; set; } = string.Empty; + } + + private sealed class JsonlEventDto + { + public DateTimeOffset At { get; set; } + public ClusterKeyDto? Key { get; set; } + public string? SessionId { get; set; } + public string? ErrorMessage { get; set; } + } + + private static ClusterDto ClusterToDto(FailureCluster c) => new() + { + Key = new ClusterKeyDto { Server = c.Key.Server, Tool = c.Key.Tool, ErrorClass = c.Key.ErrorClass }, + Count = c.Count, + SessionIds = c.SessionIds.ToList(), + FirstSeen = c.FirstSeen, + LastSeen = c.LastSeen, + SampleErrorMessages = c.SampleErrorMessages.ToList(), + }; + + private static FailureCluster? ClusterFromDto(ClusterDto dto) + { + if (dto.Key is null) return null; + + ClusterKey key; + try + { + key = new ClusterKey(dto.Key.Server, dto.Key.Tool, dto.Key.ErrorClass); + } + catch (ArgumentException) + { + return null; + } + + var sessions = new HashSet(dto.SessionIds ?? [], StringComparer.Ordinal); + var samples = dto.SampleErrorMessages ?? []; + + return new FailureCluster( + Key: key, + Count: dto.Count, + SessionIds: sessions, + FirstSeen: dto.FirstSeen, + LastSeen: dto.LastSeen, + SampleErrorMessages: samples); + } + + // ── Helpers ─────────────────────────────────────────────────────────────── + + private static string Truncate(string s, int max) + { + if (max <= 0 || s.Length <= max) return s; + return s[..max] + "…"; + } + + private static string ResolvePath(string path, string profileBasePath) + { + if (Path.IsPathRooted(path)) + return path; + + var baseDir = Path.IsPathRooted(profileBasePath) + ? profileBasePath + : Path.Combine(AppContext.BaseDirectory, profileBasePath); + + return Path.Combine(baseDir, path); + } +} diff --git a/src/RockBot.Tools.Mcp/McpManagementExecutor.cs b/src/RockBot.Tools.Mcp/McpManagementExecutor.cs index 23a197b..6299822 100644 --- a/src/RockBot.Tools.Mcp/McpManagementExecutor.cs +++ b/src/RockBot.Tools.Mcp/McpManagementExecutor.cs @@ -161,10 +161,13 @@ private async Task InvokeToolAsync(ToolInvokeRequest request // Always pass through recovery — it inspects both IsError=true responses // and IsError=false responses with embedded JSON error bodies (some MCP // servers report schema errors that way). Recovery short-circuits cheaply - // when the response is genuinely successful. + // when the response is genuinely successful. SessionId from the outer + // request is forwarded so post-recovery failures cluster by distinct + // sessions in IFailureClusterStore (Phase 5). if (_recovery is not null) { - response = await _recovery.RecoverAsync(serverName, toolName, innerRequest, response, ct); + response = await _recovery.RecoverAsync( + serverName, toolName, innerRequest, response, ct, sessionId: request.SessionId); } return response; diff --git a/src/RockBot.Tools.Mcp/Recovery/McpRecoveryExecutor.cs b/src/RockBot.Tools.Mcp/Recovery/McpRecoveryExecutor.cs index 6618c0d..8fb1c8e 100644 --- a/src/RockBot.Tools.Mcp/Recovery/McpRecoveryExecutor.cs +++ b/src/RockBot.Tools.Mcp/Recovery/McpRecoveryExecutor.cs @@ -42,6 +42,7 @@ public sealed class McpRecoveryExecutor private readonly IReadOnlyList _providers; private readonly StageBLlmFiller? _stageB; private readonly ICapabilityClaimWriter? _capabilityClaimWriter; + private readonly IFailureClusterStore? _failureClusterStore; private readonly ILogger _logger; public McpRecoveryExecutor( @@ -49,12 +50,14 @@ public McpRecoveryExecutor( IEnumerable providers, ILogger logger, StageBLlmFiller? stageB = null, - ICapabilityClaimWriter? capabilityClaimWriter = null) + ICapabilityClaimWriter? capabilityClaimWriter = null, + IFailureClusterStore? failureClusterStore = null) { _invoke = invoke; _providers = providers.ToList(); _stageB = stageB; _capabilityClaimWriter = capabilityClaimWriter; + _failureClusterStore = failureClusterStore; _logger = logger; } @@ -64,13 +67,17 @@ public McpRecoveryExecutor( /// successful recovered response. Exhausted-recovery responses carry an /// annotated trail in . /// + /// Originating session id (from the outer + /// mcp_invoke_tool request). Forwarded to the failure cluster store so + /// post-recovery failures can be grouped by distinct sessions. May be null. public Task RecoverAsync( string serverName, string toolName, ToolInvokeRequest innerRequest, ToolInvokeResponse response, - CancellationToken ct) => - TryRecoverAsync(serverName, toolName, innerRequest, response, depth: 0, ct); + CancellationToken ct, + string? sessionId = null) => + TryRecoverAsync(serverName, toolName, innerRequest, response, depth: 0, sessionId, ct); private async Task TryRecoverAsync( string serverName, @@ -78,6 +85,7 @@ private async Task TryRecoverAsync( ToolInvokeRequest innerRequest, ToolInvokeResponse response, int depth, + string? sessionId, CancellationToken ct) { if (depth >= MaxChainDepth) @@ -92,6 +100,7 @@ await EmitCapabilityClaimAsync( statement: $"recovery-exhausted: chain depth {depth} reached for {serverName}/{toolName}", evidence: chainErrorText, ct); + await RecordFailureAsync(serverName, toolName, sessionId, chainErrorText, fieldHint: null, ct); return Annotate(response, $"chain-exhausted at depth {depth}"); } @@ -99,13 +108,23 @@ await EmitCapabilityClaimAsync( if (errorText is null) return response; if (!SchemaErrorPatterns.TryExtractMissingField(errorText, out var fieldName)) + { + // Non-schema error — recovery has no field to fill. Still a post-recovery + // failure from the gateway's perspective; record under "unknown" so + // DreamService can spot recurring auth/network/server-side failures the + // same way it spots schema-pattern failures. + await RecordFailureAsync(serverName, toolName, sessionId, errorText, fieldHint: null, ct); return response; + } var existingArgs = McpToolExecutor.ParseArguments(innerRequest.Arguments); if (existingArgs.ContainsKey(fieldName)) { // The field is already present — the error is something else despite matching - // the pattern (e.g., the value was wrong, not missing). Don't loop. + // the pattern (e.g., the value was wrong, not missing). Don't loop. Cluster + // these as "unknown" so the merged group reflects "server complains a field + // is required even though it's in args" rather than fragmenting by field. + await RecordFailureAsync(serverName, toolName, sessionId, errorText, fieldHint: "unknown", ct); return response; } @@ -168,7 +187,7 @@ await EmitCapabilityClaimAsync( ToolName = toolName, Arguments = JsonSerializer.Serialize(mergedArgs) }; - return await TryRecoverAsync(serverName, toolName, nextRequest, retryResponse, depth + 1, ct); + return await TryRecoverAsync(serverName, toolName, nextRequest, retryResponse, depth + 1, sessionId, ct); } // Retry still failed and isn't chainable. @@ -177,6 +196,7 @@ await EmitCapabilityClaimAsync( statement: $"recovery-exhausted: Stage A provider {providerName} resolved field {fieldName} but the call still failed", evidence: retryError ?? errorText, ct); + await RecordFailureAsync(serverName, toolName, sessionId, retryError ?? errorText, fieldHint: null, ct); return Annotate(response, $"stageA={providerName} retry-failed: {Truncate(retryResponse.Content)}"); } @@ -218,7 +238,7 @@ await EmitCapabilityClaimAsync( ToolName = toolName, Arguments = JsonSerializer.Serialize(mergedArgs) }; - return await TryRecoverAsync(serverName, toolName, nextRequest, retryResponse, depth + 1, ct); + return await TryRecoverAsync(serverName, toolName, nextRequest, retryResponse, depth + 1, sessionId, ct); } await EmitCapabilityClaimAsync( @@ -226,6 +246,7 @@ await EmitCapabilityClaimAsync( statement: $"recovery-exhausted: Stage B filled field {fieldName} but the call still failed", evidence: retryError ?? errorText, ct); + await RecordFailureAsync(serverName, toolName, sessionId, retryError ?? errorText, fieldHint: null, ct); return Annotate(response, $"stageA=no-provider; stageB=filled retry-failed: {Truncate(retryResponse.Content)}"); } @@ -238,12 +259,14 @@ await EmitCapabilityClaimAsync( statement: $"recovery-exhausted: Stage B could not fill field {fieldName}", evidence: errorText, ct); + await RecordFailureAsync(serverName, toolName, sessionId, errorText, fieldHint: fieldName, ct); return Annotate(response, "stageA=no-provider; stageB=fill-failed"); } sw.Stop(); RecordTelemetry(serverName, toolName, fieldName, "A", recovered: false, "no-provider", sw.Elapsed.TotalMilliseconds); + await RecordFailureAsync(serverName, toolName, sessionId, errorText, fieldHint: fieldName, ct); return Annotate(response, "stageA=no-provider; stageB=disabled"); } @@ -479,6 +502,62 @@ private static void RecordTelemetry( RecoveryDiagnostics.Duration.Record(durationMs, tags); } + /// + /// Phase 5 producer side: records a post-recovery failure into the cluster store + /// so DreamService can spot recurring (server, tool, error-class) patterns and + /// open repair tickets. Auto-recovered calls are not recorded — those are + /// covered by metrics. Error class is the + /// missing field name from , or + /// "unknown" if no pattern matches. Best-effort; failures here never + /// break the recovery path. + /// + private async Task RecordFailureAsync( + string serverName, + string toolName, + string? sessionId, + string? errorText, + string? fieldHint, + CancellationToken ct) + { + if (_failureClusterStore is null) return; + + var errorClass = !string.IsNullOrEmpty(fieldHint) + ? fieldHint + : ExtractErrorClass(errorText); + + ClusterKey key; + try + { + key = new ClusterKey(serverName, toolName, errorClass); + } + catch (ArgumentException ex) + { + _logger.LogWarning(ex, + "Skipping failure cluster record for {Server}/{Tool} — invalid key components", + serverName, toolName); + return; + } + + try + { + await _failureClusterStore.RecordAsync( + key, sessionId, errorText ?? string.Empty, DateTimeOffset.UtcNow, ct); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Failed to record failure cluster for {Server}/{Tool} class {ErrorClass}", + serverName, toolName, errorClass); + } + } + + private static string ExtractErrorClass(string? errorText) => + SchemaErrorPatterns.TryExtractMissingField(errorText, out var fieldName) ? fieldName : "unknown"; + /// /// Phase 2 producer side: when recovery has been attempted but exhausted, write /// a falsifiable capability claim against (server, tool, current arguments). diff --git a/tests/RockBot.Host.Tests/FailureClusterTests.cs b/tests/RockBot.Host.Tests/FailureClusterTests.cs new file mode 100644 index 0000000..eaa1f02 --- /dev/null +++ b/tests/RockBot.Host.Tests/FailureClusterTests.cs @@ -0,0 +1,33 @@ +namespace RockBot.Host.Tests; + +[TestClass] +public class FailureClusterTests +{ + [TestMethod] + public void ClusterKey_LowercasesServerAndTool() + { + var key = new ClusterKey("Calendar-MCP", "Get_Calendar_Events", "timeZone"); + + Assert.AreEqual("calendar-mcp", key.Server); + Assert.AreEqual("get_calendar_events", key.Tool); + Assert.AreEqual("timeZone", key.ErrorClass); + } + + [TestMethod] + public void ClusterKey_RejectsBlankComponents() + { + Assert.ThrowsExactly(() => new ClusterKey("", "tool", "class")); + Assert.ThrowsExactly(() => new ClusterKey("server", " ", "class")); + Assert.ThrowsExactly(() => new ClusterKey("server", "tool", "")); + } + + [TestMethod] + public void ClusterKey_EqualsByCanonicalisedComponents() + { + var a = new ClusterKey("Calendar-MCP", "Tool", "class"); + var b = new ClusterKey("calendar-mcp", "tool", "class"); + + Assert.AreEqual(a, b); + Assert.AreEqual(a.GetHashCode(), b.GetHashCode()); + } +} diff --git a/tests/RockBot.Host.Tests/FailureErrorClassifierTests.cs b/tests/RockBot.Host.Tests/FailureErrorClassifierTests.cs new file mode 100644 index 0000000..34f0226 --- /dev/null +++ b/tests/RockBot.Host.Tests/FailureErrorClassifierTests.cs @@ -0,0 +1,39 @@ +namespace RockBot.Host.Tests; + +[TestClass] +public class FailureErrorClassifierTests +{ + [TestMethod] + [DataRow("Required parameter 'timeZone'", "timeZone")] + [DataRow("Required parameter \"accountId\"", "accountId")] + [DataRow("Required parameter startDate", "startDate")] + [DataRow("'timeZone' is required", "timeZone")] + [DataRow("accountId is required", "accountId")] + [DataRow("missing required argument 'endDate'", "endDate")] + [DataRow("missing required argument tz", "tz")] + [DataRow("expected field 'orgId'", "orgId")] + [DataRow("expected field projectId", "projectId")] + [DataRow("'timeZone': must be provided", "timeZone")] + [DataRow("siteId: must be provided", "siteId")] + public void Classify_ExtractsMissingFieldName(string error, string expected) + { + Assert.AreEqual(expected, FailureErrorClassifier.Classify(error)); + } + + [TestMethod] + [DataRow("internal server error 500")] + [DataRow("network timeout")] + [DataRow("unauthorized")] + [DataRow("")] + [DataRow(null)] + public void Classify_FallsBackToUnknown(string? error) + { + Assert.AreEqual(FailureErrorClassifier.Unknown, FailureErrorClassifier.Classify(error)); + } + + [TestMethod] + public void Classify_IsCaseInsensitiveLikePhase1Patterns() + { + Assert.AreEqual("X", FailureErrorClassifier.Classify("REQUIRED PARAMETER 'X'")); + } +} diff --git a/tests/RockBot.Host.Tests/FileFailureClusterStoreTests.cs b/tests/RockBot.Host.Tests/FileFailureClusterStoreTests.cs new file mode 100644 index 0000000..aa8f846 --- /dev/null +++ b/tests/RockBot.Host.Tests/FileFailureClusterStoreTests.cs @@ -0,0 +1,276 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; + +namespace RockBot.Host.Tests; + +[TestClass] +public class FileFailureClusterStoreTests +{ + private string _tempDir = null!; + + [TestInitialize] + public void Setup() + { + _tempDir = Path.Combine(Path.GetTempPath(), "rockbot-cluster-test-" + Guid.NewGuid().ToString("N")); + } + + [TestCleanup] + public void Cleanup() + { + if (Directory.Exists(_tempDir)) + Directory.Delete(_tempDir, recursive: true); + } + + [TestMethod] + public async Task RecordAsync_FirstFailure_CreatesClusterWithCountOne() + { + var store = CreateStore(); + var key = new ClusterKey("calendar-mcp", "get_calendar_events", "timeZone"); + + await store.RecordAsync(key, "session-1", "timeZone is required", DateTimeOffset.UtcNow); + + var all = await store.GetAllAsync(); + Assert.AreEqual(1, all.Count); + var cluster = all[0]; + Assert.AreEqual(key, cluster.Key); + Assert.AreEqual(1, cluster.Count); + CollectionAssert.AreEquivalent(new[] { "session-1" }, cluster.SessionIds.ToArray()); + Assert.AreEqual(1, cluster.SampleErrorMessages.Count); + Assert.AreEqual("timeZone is required", cluster.SampleErrorMessages[0]); + } + + [TestMethod] + public async Task GetEscalatable_ThreeFailuresAcrossTwoSessions_InWindow_ReturnsCluster() + { + var store = CreateStore(); + var key = new ClusterKey("calendar-mcp", "get_calendar_events", "timeZone"); + var now = DateTimeOffset.UtcNow; + + await store.RecordAsync(key, "session-1", "timeZone is required", now); + await store.RecordAsync(key, "session-1", "timeZone is required", now); + await store.RecordAsync(key, "session-2", "timeZone is required", now); + + var escalatable = await store.GetEscalatableAsync(now); + Assert.AreEqual(1, escalatable.Count); + Assert.AreEqual(3, escalatable[0].Count); + Assert.AreEqual(2, escalatable[0].SessionIds.Count); + } + + [TestMethod] + public async Task GetEscalatable_BelowCountThreshold_ReturnsEmpty() + { + var store = CreateStore(); + var key = new ClusterKey("calendar-mcp", "get_calendar_events", "timeZone"); + var now = DateTimeOffset.UtcNow; + + await store.RecordAsync(key, "session-1", "err", now); + await store.RecordAsync(key, "session-2", "err", now); + + var escalatable = await store.GetEscalatableAsync(now); + Assert.AreEqual(0, escalatable.Count); + } + + [TestMethod] + public async Task GetEscalatable_OnlyOneSession_ReturnsEmpty() + { + var store = CreateStore(); + var key = new ClusterKey("calendar-mcp", "get_calendar_events", "timeZone"); + var now = DateTimeOffset.UtcNow; + + for (var i = 0; i < 5; i++) + await store.RecordAsync(key, "session-1", "err", now); + + var escalatable = await store.GetEscalatableAsync(now); + Assert.AreEqual(0, escalatable.Count); + } + + [TestMethod] + public async Task GetEscalatable_OutsideRecencyWindow_ReturnsEmpty() + { + var store = CreateStore(); + var key = new ClusterKey("calendar-mcp", "get_calendar_events", "timeZone"); + var twoDaysAgo = DateTimeOffset.UtcNow.AddDays(-2); + + await store.RecordAsync(key, "session-1", "err", twoDaysAgo); + await store.RecordAsync(key, "session-1", "err", twoDaysAgo); + await store.RecordAsync(key, "session-2", "err", twoDaysAgo); + + var escalatable = await store.GetEscalatableAsync(DateTimeOffset.UtcNow); + Assert.AreEqual(0, escalatable.Count); + } + + [TestMethod] + public async Task RecordAsync_BoundsSampleMessagesToFiveMostRecent() + { + var store = CreateStore(); + var key = new ClusterKey("svr", "tool", "field"); + var now = DateTimeOffset.UtcNow; + + for (var i = 0; i < 8; i++) + await store.RecordAsync(key, "s", $"error-{i}", now); + + var cluster = (await store.GetAllAsync())[0]; + Assert.AreEqual(8, cluster.Count); + Assert.AreEqual(5, cluster.SampleErrorMessages.Count); + // Most-recent-kept order: errors 3..7 + CollectionAssert.AreEqual( + new[] { "error-3", "error-4", "error-5", "error-6", "error-7" }, + cluster.SampleErrorMessages.ToArray()); + } + + [TestMethod] + public async Task RecordAsync_TruncatesLongErrorMessage() + { + var store = CreateStore(); + var key = new ClusterKey("svr", "tool", "field"); + var longMessage = new string('x', 1024); + + await store.RecordAsync(key, "s", longMessage, DateTimeOffset.UtcNow); + + var sample = (await store.GetAllAsync())[0].SampleErrorMessages[0]; + Assert.IsTrue(sample.Length <= 513, $"sample length was {sample.Length}"); + Assert.IsTrue(sample.EndsWith("…")); + } + + [TestMethod] + public async Task RecordAsync_NullSessionIdDoesNotPolluteSessionSet() + { + var store = CreateStore(); + var key = new ClusterKey("svr", "tool", "field"); + var now = DateTimeOffset.UtcNow; + + await store.RecordAsync(key, sessionId: null, "err", now); + await store.RecordAsync(key, sessionId: null, "err", now); + + var cluster = (await store.GetAllAsync())[0]; + Assert.AreEqual(2, cluster.Count); + Assert.AreEqual(0, cluster.SessionIds.Count); + } + + [TestMethod] + public async Task Persistence_SnapshotAndJsonl_RestoreClusterState() + { + var key = new ClusterKey("svr", "tool", "field"); + var t0 = DateTimeOffset.UtcNow; + + // First instance: record + flush. + var first = CreateStore(); + await first.RecordAsync(key, "session-1", "err1", t0); + await first.RecordAsync(key, "session-2", "err2", t0); + await first.FlushAsync(CancellationToken.None); + + // Record another after flush — lands in JSONL only. Real callers pass + // DateTimeOffset.UtcNow at the moment of recording, which is always after + // the snapshot's WrittenAt; mirror that here. + await first.RecordAsync(key, "session-3", "err3", DateTimeOffset.UtcNow.AddSeconds(1)); + + // Second instance: should load both snapshot + JSONL replay. + var second = CreateStore(); + await second.StartAsync(CancellationToken.None); + + var clusters = await second.GetAllAsync(); + Assert.AreEqual(1, clusters.Count); + Assert.AreEqual(3, clusters[0].Count); + CollectionAssert.AreEquivalent( + new[] { "session-1", "session-2", "session-3" }, + clusters[0].SessionIds.ToArray()); + + await second.StopAsync(CancellationToken.None); + } + + [TestMethod] + public async Task Flush_TruncatesJsonl() + { + var key = new ClusterKey("svr", "tool", "field"); + + var store = CreateStore(); + await store.RecordAsync(key, "s1", "err", DateTimeOffset.UtcNow); + await store.RecordAsync(key, "s2", "err", DateTimeOffset.UtcNow); + + var jsonlPath = Path.Combine(_tempDir, "failure-clusters.jsonl"); + Assert.IsTrue(new FileInfo(jsonlPath).Length > 0, "jsonl should have content before flush"); + + await store.FlushAsync(CancellationToken.None); + + Assert.AreEqual(0, new FileInfo(jsonlPath).Length, "jsonl should be truncated after flush"); + + var snapshotPath = Path.Combine(_tempDir, "failure-clusters.snapshot.json"); + Assert.IsTrue(File.Exists(snapshotPath), "snapshot should exist after flush"); + Assert.IsTrue(new FileInfo(snapshotPath).Length > 0, "snapshot should have content"); + } + + [TestMethod] + public async Task Persistence_CorruptJsonlLine_IsSkipped() + { + var key = new ClusterKey("svr", "tool", "field"); + var t0 = DateTimeOffset.UtcNow; + + var first = CreateStore(); + await first.RecordAsync(key, "session-1", "err1", t0); + await first.FlushAsync(CancellationToken.None); + await first.RecordAsync(key, "session-2", "err2", DateTimeOffset.UtcNow.AddSeconds(1)); + + // Append a malformed line that should be ignored on replay. + var jsonlPath = Path.Combine(_tempDir, "failure-clusters.jsonl"); + await File.AppendAllTextAsync(jsonlPath, "not-valid-json{{{" + Environment.NewLine); + + var second = CreateStore(); + await second.StartAsync(CancellationToken.None); + + var clusters = await second.GetAllAsync(); + Assert.AreEqual(1, clusters.Count); + Assert.AreEqual(2, clusters[0].Count); + + await second.StopAsync(CancellationToken.None); + } + + [TestMethod] + public async Task Persistence_JsonlEventsBeforeSnapshotTime_AreNotDoubleApplied() + { + var key = new ClusterKey("svr", "tool", "field"); + + var first = CreateStore(); + // Record an old event (snapshot included it). + await first.RecordAsync(key, "session-1", "err1", DateTimeOffset.UtcNow.AddMinutes(-5)); + await first.FlushAsync(CancellationToken.None); + // Old JSONL is truncated after flush. Now record a new event. + await first.RecordAsync(key, "session-2", "err2", DateTimeOffset.UtcNow); + + var second = CreateStore(); + await second.StartAsync(CancellationToken.None); + + var clusters = await second.GetAllAsync(); + // Snapshot has 1 (session-1), JSONL has 1 (session-2). Total = 2 (not 3). + Assert.AreEqual(2, clusters[0].Count); + + await second.StopAsync(CancellationToken.None); + } + + [TestMethod] + public async Task GetAllAsync_OrdersByLastSeenDescending() + { + var store = CreateStore(); + var older = new ClusterKey("svr", "tool", "older"); + var newer = new ClusterKey("svr", "tool", "newer"); + var t1 = DateTimeOffset.UtcNow.AddMinutes(-10); + var t2 = DateTimeOffset.UtcNow; + + await store.RecordAsync(older, "s", "e", t1); + await store.RecordAsync(newer, "s", "e", t2); + + var all = await store.GetAllAsync(); + Assert.AreEqual(2, all.Count); + Assert.AreEqual(newer, all[0].Key); + Assert.AreEqual(older, all[1].Key); + } + + private FileFailureClusterStore CreateStore() => new( + Options.Create(new FailureClusterOptions + { + BasePath = _tempDir, + // Disable timer-based flush for tests; we drive it explicitly. + FlushInterval = TimeSpan.Zero, + }), + Options.Create(new AgentProfileOptions()), + NullLogger.Instance); +} diff --git a/tests/RockBot.Tools.Tests/Recovery/McpRecoveryExecutorFailureClusterTests.cs b/tests/RockBot.Tools.Tests/Recovery/McpRecoveryExecutorFailureClusterTests.cs new file mode 100644 index 0000000..d6ea50b --- /dev/null +++ b/tests/RockBot.Tools.Tests/Recovery/McpRecoveryExecutorFailureClusterTests.cs @@ -0,0 +1,400 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; +using RockBot.Host; +using RockBot.Tools.Mcp; +using RockBot.Tools.Mcp.Recovery; + +namespace RockBot.Tools.Tests.Recovery; + +/// +/// Phase 5 wire-in: the recovery executor records every post-recovery failure +/// into . Auto-recovered calls do NOT record. +/// +[TestClass] +public class McpRecoveryExecutorFailureClusterTests +{ + [TestMethod] + public async Task AutoRecovered_StageA_DoesNotRecord() + { + var store = new RecordingClusterStore(); + var provider = new FakeProvider( + (_, _, f) => f == "timeZone", + _ => new ResolvedDefault("America/Chicago")); + // Retry succeeds — recovery successful, nothing recorded. + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "events: []")); + + var exec = new McpRecoveryExecutor( + invoke, [provider], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "get", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'timeZone' was not provided"); + + var result = await exec.RecoverAsync("srv", "get", req, failed, default, + sessionId: "session-A"); + + Assert.IsFalse(result.IsError); + Assert.AreEqual(0, store.Records.Count, "auto-recovered calls must not enter the cluster store"); + } + + [TestMethod] + public async Task StageA_RetryFailed_RecordsClusterWithFieldClassAndSessionId() + { + var store = new RecordingClusterStore(); + var provider = new FakeProvider( + (_, _, f) => f == "timeZone", + _ => new ResolvedDefault("America/Chicago")); + // Retry surfaces a non-schema failure that doesn't chain. + McpInvokeDelegate invoke = (r, _, _) => + Task.FromResult(Err(r, "permission denied")); + + var exec = new McpRecoveryExecutor( + invoke, [provider], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "get_calendar_events", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'timeZone' was not provided"); + + var result = await exec.RecoverAsync( + "calendar-mcp", "get_calendar_events", req, failed, default, + sessionId: "session-A"); + + Assert.IsTrue(result.IsError); + Assert.AreEqual(1, store.Records.Count); + var rec = store.Records[0]; + Assert.AreEqual("calendar-mcp", rec.Key.Server); + Assert.AreEqual("get_calendar_events", rec.Key.Tool); + Assert.AreEqual("unknown", rec.Key.ErrorClass, + "Stage A retry-failure surfaces the secondary error (permission denied) which is non-schema → unknown class"); + Assert.AreEqual("session-A", rec.SessionId); + } + + [TestMethod] + public async Task StageB_FillFailed_RecordsClusterWithFieldClass() + { + var store = new RecordingClusterStore(); + var stageB = new NullFiller(); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "ok")); + + var exec = new McpRecoveryExecutor( + invoke, [], NullLogger.Instance, + stageB: stageB.Filler, failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "do", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'novelField'"); + + var result = await exec.RecoverAsync("synthetic", "do", req, failed, default, + sessionId: "session-B"); + + Assert.IsTrue(result.IsError); + Assert.AreEqual(1, store.Records.Count); + Assert.AreEqual("novelField", store.Records[0].Key.ErrorClass); + Assert.AreEqual("session-B", store.Records[0].SessionId); + } + + [TestMethod] + public async Task StageB_RetryFailed_RecordsCluster() + { + var store = new RecordingClusterStore(); + var stageB = new CannedFiller("\"value\""); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Err(r, "permission denied")); + + var exec = new McpRecoveryExecutor( + invoke, [], NullLogger.Instance, + stageB: stageB.Filler, failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "do", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'novelField'"); + + var result = await exec.RecoverAsync("synthetic", "do", req, failed, default, + sessionId: "session-C"); + + Assert.IsTrue(result.IsError); + Assert.AreEqual(1, store.Records.Count); + // retryError ("permission denied") doesn't match schema patterns → unknown class. + Assert.AreEqual("unknown", store.Records[0].Key.ErrorClass); + } + + [TestMethod] + public async Task NoStageAProvider_NoStageB_RecordsCluster() + { + var store = new RecordingClusterStore(); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "")); + + var exec = new McpRecoveryExecutor( + invoke, providers: [], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "do", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'mysteryField'"); + + var result = await exec.RecoverAsync("synthetic", "do", req, failed, default, + sessionId: "session-D"); + + Assert.IsTrue(result.IsError); + Assert.AreEqual(1, store.Records.Count); + Assert.AreEqual("mysteryField", store.Records[0].Key.ErrorClass); + } + + [TestMethod] + public async Task ChainExhausted_RecordsCluster() + { + // Chain through MaxChainDepth (4) iterations by filling a different field + // each retry, then surface a schema error on the final retry that triggers + // chain-exhaustion at depth 4. + var store = new RecordingClusterStore(); + var fields = new[] { "f1", "f2", "f3", "f4", "f5" }; + var i = 0; + var provider = new FakeProvider( + (_, _, _) => true, + ctx => new ResolvedDefault($"v-{ctx.FieldName}")); + + McpInvokeDelegate invoke = (r, _, _) => + { + // Each retry surfaces the next missing field, until i exhausts. + i++; + if (i >= fields.Length) + return Task.FromResult(Err(r, $"Required parameter '{fields[i - 1]}'")); + return Task.FromResult(Err(r, $"Required parameter '{fields[i]}'")); + }; + + var exec = new McpRecoveryExecutor( + invoke, [provider], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "do", Arguments = "{}" }; + var failed = Err(req, $"Required parameter '{fields[0]}'"); + + var result = await exec.RecoverAsync("synthetic", "do", req, failed, default, + sessionId: "session-E"); + + Assert.IsTrue(result.IsError); + StringAssert.Contains(result.Content, "chain-exhausted"); + Assert.IsTrue(store.Records.Count >= 1, "chain exhaustion must record at least one cluster entry"); + } + + [TestMethod] + public async Task NonSchemaError_RecordsClusterAsUnknown() + { + // Error string doesn't match any of Phase 1's "missing required field" + // patterns — recovery has no field to fill, but the failure should still + // land in the cluster store under errorClass="unknown" so DreamService + // can spot recurring auth/network/server-side failures. + var store = new RecordingClusterStore(); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "")); + + var exec = new McpRecoveryExecutor( + invoke, [], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "get_generation", Arguments = "{}" }; + var failed = Err(req, "An error occurred invoking 'get_generation'."); + + var result = await exec.RecoverAsync("openrouter", "get_generation", req, failed, default, + sessionId: "session-X"); + + Assert.IsTrue(result.IsError); + Assert.AreEqual(1, store.Records.Count); + Assert.AreEqual("unknown", store.Records[0].Key.ErrorClass); + Assert.AreEqual("openrouter", store.Records[0].Key.Server); + Assert.AreEqual("get_generation", store.Records[0].Key.Tool); + Assert.AreEqual("session-X", store.Records[0].SessionId); + } + + [TestMethod] + public async Task FieldAlreadyInArgs_RecordsClusterAsUnknown() + { + // The error matches a schema pattern (extracts "timeZone") but timeZone + // is already in the request arguments — the actual error is about + // something else. Don't loop on recovery; record under "unknown" so + // these "server says X is required but X is present" failures cluster + // together rather than fragmenting by the misleading field name. + var store = new RecordingClusterStore(); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "")); + + var exec = new McpRecoveryExecutor( + invoke, [], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest + { + ToolCallId = "1", ToolName = "get", + Arguments = """{"timeZone":"UTC"}""" + }; + var failed = Err(req, "Required parameter 'timeZone' not satisfied"); + + var result = await exec.RecoverAsync("srv", "get", req, failed, default, + sessionId: "session-Y"); + + Assert.IsTrue(result.IsError); + Assert.AreEqual(1, store.Records.Count); + Assert.AreEqual("unknown", store.Records[0].Key.ErrorClass); + } + + [TestMethod] + public async Task NullSessionId_StillRecords() + { + var store = new RecordingClusterStore(); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "")); + + var exec = new McpRecoveryExecutor( + invoke, [], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "do", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'x'"); + + await exec.RecoverAsync("synthetic", "do", req, failed, default, sessionId: null); + + Assert.AreEqual(1, store.Records.Count); + Assert.IsNull(store.Records[0].SessionId); + } + + [TestMethod] + public async Task NoStore_RecoveryStillWorks() + { + // Phase-1 contract: the executor must work without a cluster store. + var provider = new FakeProvider( + (_, _, f) => f == "timeZone", + _ => new ResolvedDefault("America/Chicago")); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "events")); + + var exec = new McpRecoveryExecutor( + invoke, [provider], NullLogger.Instance); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "get", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'timeZone'"); + + var result = await exec.RecoverAsync("srv", "get", req, failed, default); + + Assert.IsFalse(result.IsError); + } + + [TestMethod] + public async Task StoreThrows_DoesNotBreakRecovery() + { + var store = new ThrowingClusterStore(); + McpInvokeDelegate invoke = (r, _, _) => Task.FromResult(Ok(r, "")); + + var exec = new McpRecoveryExecutor( + invoke, [], NullLogger.Instance, + failureClusterStore: store); + + var req = new ToolInvokeRequest { ToolCallId = "1", ToolName = "do", Arguments = "{}" }; + var failed = Err(req, "Required parameter 'x'"); + + // Must not bubble the store's exception — recovery is the priority path. + var result = await exec.RecoverAsync("synthetic", "do", req, failed, default, + sessionId: "session"); + + Assert.IsTrue(result.IsError); + } + + // --- helpers ------------------------------------------------------------- + + private static ToolInvokeResponse Err(ToolInvokeRequest req, string content) => new() + { + ToolCallId = req.ToolCallId, ToolName = req.ToolName, + Content = content, IsError = true + }; + + private static ToolInvokeResponse Ok(ToolInvokeRequest req, string content) => new() + { + ToolCallId = req.ToolCallId, ToolName = req.ToolName, + Content = content, IsError = false + }; + + private sealed record RecordedFailure( + ClusterKey Key, string? SessionId, string ErrorMessage, DateTimeOffset At); + + private sealed class RecordingClusterStore : IFailureClusterStore + { + public List Records { get; } = new(); + + public Task RecordAsync( + ClusterKey key, string? sessionId, string errorMessage, DateTimeOffset at, + CancellationToken cancellationToken = default) + { + Records.Add(new RecordedFailure(key, sessionId, errorMessage, at)); + return Task.CompletedTask; + } + + public Task> GetAllAsync(CancellationToken ct = default) => + Task.FromResult>([]); + + public Task> GetEscalatableAsync(DateTimeOffset now, CancellationToken ct = default) => + Task.FromResult>([]); + } + + private sealed class ThrowingClusterStore : IFailureClusterStore + { + public Task RecordAsync( + ClusterKey key, string? sessionId, string errorMessage, DateTimeOffset at, + CancellationToken cancellationToken = default) => + throw new InvalidOperationException("store offline"); + + public Task> GetAllAsync(CancellationToken ct = default) => + Task.FromResult>([]); + + public Task> GetEscalatableAsync(DateTimeOffset now, CancellationToken ct = default) => + Task.FromResult>([]); + } + + private sealed class FakeProvider( + Func match, + Func resolve) : IToolArgumentDefaultsProvider + { + public bool CanResolve(string s, string t, string f) => match(s, t, f); + public Task ResolveAsync(ResolveContext ctx, CancellationToken ct) => + Task.FromResult(resolve(ctx)); + } + + private sealed class NullFiller + { + public StageBLlmFiller Filler { get; } = new TestFiller(); + private sealed class TestFiller() : StageBLlmFiller( + new NoopLlmClient(), NullLogger.Instance) + { + public override Task TryFillAsync( + string serverName, string toolName, string fieldName, + IReadOnlyDictionary existingArgs, + string? originalErrorText, CancellationToken ct) => + Task.FromResult(null); + } + } + + private sealed class CannedFiller + { + public StageBLlmFiller Filler { get; } + public CannedFiller(string canned) => Filler = new TestFiller(canned); + + private sealed class TestFiller(string canned) : StageBLlmFiller( + new NoopLlmClient(), NullLogger.Instance) + { + public override Task TryFillAsync( + string serverName, string toolName, string fieldName, + IReadOnlyDictionary existingArgs, + string? originalErrorText, CancellationToken ct) + { + var doc = JsonDocument.Parse(canned); + return Task.FromResult(McpToolExecutor.ConvertJsonElement(doc.RootElement)); + } + } + } + + private sealed class NoopLlmClient : ILlmClient + { + public Task GetResponseAsync( + IEnumerable messages, + Microsoft.Extensions.AI.ChatOptions? options, + CancellationToken cancellationToken) => + throw new NotSupportedException(); + + public Task GetResponseAsync( + IEnumerable messages, + ModelTier tier, + Microsoft.Extensions.AI.ChatOptions? options, + CancellationToken cancellationToken) => + throw new NotSupportedException(); + } +}