using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Text.Json; using AxCopilot.Models; using AxCopilot.Services; namespace AxCopilot.Services.Agent; internal sealed class StreamingToolExecutionCoordinator : IToolExecutionCoordinator { private static readonly HashSet PrefetchableReadOnlyTools = new(StringComparer.OrdinalIgnoreCase) { "file_read", "document_read", "env_tool", "datetime_tool", "dev_env_detect", "memory", "json_tool", "regex_tool", "base64_tool", "hash_tool", "image_analyze" }; private readonly ILlmService _llm; private readonly Func, string> _resolveRequestedToolName; private readonly Func?, CancellationToken, Task> _executeToolAsync; private readonly Action _emitEvent; private readonly Func _isContextOverflowError; private readonly Func, bool> _forceContextRecovery; private readonly Func _isTransientLlmError; private readonly Func _computeTransientBackoffDelayMs; private readonly TimeSpan _firstResponseHeartbeatDelay; private readonly TimeSpan _responseHeartbeatInterval; public StreamingToolExecutionCoordinator( ILlmService llm, Func, string> resolveRequestedToolName, Func?, CancellationToken, Task> executeToolAsync, Action emitEvent, Func isContextOverflowError, Func, bool> forceContextRecovery, Func isTransientLlmError, Func computeTransientBackoffDelayMs, TimeSpan? firstResponseHeartbeatDelay = null, TimeSpan? responseHeartbeatInterval = null) { _llm = llm; _resolveRequestedToolName = resolveRequestedToolName; _executeToolAsync = executeToolAsync; _emitEvent = emitEvent; _isContextOverflowError = isContextOverflowError; _forceContextRecovery = forceContextRecovery; _isTransientLlmError = isTransientLlmError; _computeTransientBackoffDelayMs = computeTransientBackoffDelayMs; _firstResponseHeartbeatDelay = firstResponseHeartbeatDelay ?? TimeSpan.FromSeconds(8); _responseHeartbeatInterval = responseHeartbeatInterval ?? TimeSpan.FromSeconds(15); } public async Task TryPrefetchReadOnlyToolAsync( ContentBlock block, IReadOnlyCollection tools, AgentContext context, CancellationToken ct) { var activeToolNames = tools.Select(t => t.Name).Distinct(StringComparer.OrdinalIgnoreCase).ToList(); var resolvedToolName = _resolveRequestedToolName(block.ToolName, activeToolNames); block.ResolvedToolName = resolvedToolName; if (!PrefetchableReadOnlyTools.Contains(resolvedToolName)) return null; _emitEvent( AgentEventType.Thinking, resolvedToolName, $"읽기 도구 조기 실행 준비: {resolvedToolName}"); var sw = Stopwatch.StartNew(); try { var input = block.ToolInput ?? JsonDocument.Parse("{}").RootElement; var result = await _executeToolAsync(resolvedToolName, input, context, null, ct); sw.Stop(); return new ToolPrefetchResult(result, sw.ElapsedMilliseconds, resolvedToolName); } catch (Exception ex) { sw.Stop(); return new ToolPrefetchResult( ToolResult.Fail($"조기 실행 오류: {ex.Message}"), sw.ElapsedMilliseconds, resolvedToolName); } } public async Task> SendWithToolsWithRecoveryAsync( List messages, IReadOnlyCollection tools, CancellationToken ct, string phaseLabel, AgentLoopService.RunState? runState = null, bool forceToolCall = false, Func>? prefetchToolCallAsync = null, Func? onStreamEventAsync = null) { var transientRetries = runState?.TransientLlmErrorRetries ?? 0; var contextRecoveryRetries = runState?.ContextRecoveryAttempts ?? 0; while (true) { var streamedAnyPartialState = false; try { if (onStreamEventAsync == null) return await _llm.SendWithToolsAsync(messages, tools, ct, forceToolCall, prefetchToolCallAsync); var blocks = new List(); var textBuilder = new StringBuilder(); var (service, model) = _llm.GetCurrentModelInfo(); LogService.Info( $"[AgentLoopWait] {phaseLabel}: LLM 요청 시작 (service={service}, model={model}, messages={messages.Count}, tools={tools.Count}, forceToolCall={forceToolCall})"); _emitEvent(AgentEventType.Thinking, "", $"{phaseLabel}: 모델에 요청하는 중입니다..."); var waitStopwatch = Stopwatch.StartNew(); var firstEventReceived = false; var nextHeartbeatAt = _firstResponseHeartbeatDelay; await using var stream = _llm .StreamWithToolsAsync(messages, tools, forceToolCall, prefetchToolCallAsync, ct) .GetAsyncEnumerator(ct); while (true) { var moveNextTask = stream.MoveNextAsync().AsTask(); while (!moveNextTask.IsCompleted) { var remaining = nextHeartbeatAt - waitStopwatch.Elapsed; if (remaining < TimeSpan.Zero) remaining = TimeSpan.Zero; var delayTask = Task.Delay(remaining, ct); var completedTask = await Task.WhenAny(moveNextTask, delayTask).ConfigureAwait(false); if (completedTask == moveNextTask) break; var waited = waitStopwatch.Elapsed; EmitWaitHeartbeat(phaseLabel, waited, firstEventReceived); nextHeartbeatAt = waited + _responseHeartbeatInterval; } if (!await moveNextTask.ConfigureAwait(false)) break; var evt = stream.Current; if (!firstEventReceived) { firstEventReceived = true; LogService.Debug( $"[AgentLoopWait] {phaseLabel}: 첫 응답 수신 ({waitStopwatch.ElapsedMilliseconds}ms, kind={evt.Kind})"); if (waitStopwatch.Elapsed >= _firstResponseHeartbeatDelay) _emitEvent(AgentEventType.Thinking, "", $"{phaseLabel}: 모델 첫 응답을 받아 계속 진행합니다."); } await onStreamEventAsync(evt); if (evt.Kind == ToolStreamEventKind.TextDelta && !string.IsNullOrWhiteSpace(evt.Text)) { streamedAnyPartialState = true; textBuilder.Append(evt.Text); } else if (evt.Kind == ToolStreamEventKind.ToolCallReady && evt.ToolCall != null) { streamedAnyPartialState = true; blocks.Add(evt.ToolCall); } } var result = new List(); var text = textBuilder.ToString().Trim(); if (!string.IsNullOrWhiteSpace(text)) result.Add(new ContentBlock { Type = "text", Text = text }); result.AddRange(blocks); return result; } catch (Exception ex) { if (_isContextOverflowError(ex.Message) && contextRecoveryRetries < 2 && _forceContextRecovery(messages)) { if (onStreamEventAsync != null && streamedAnyPartialState) await onStreamEventAsync(new ToolStreamEvent(ToolStreamEventKind.RetryReset, $"{phaseLabel}:retry")); contextRecoveryRetries++; if (runState != null) runState.ContextRecoveryAttempts = contextRecoveryRetries; _emitEvent( AgentEventType.Thinking, "", $"{phaseLabel}: 컨텍스트 한도 초과로 대화를 압축한 후 재시도합니다 ({contextRecoveryRetries}/2)"); continue; } if (ct.IsCancellationRequested) throw; if (_isTransientLlmError(ex) && transientRetries < 3) { if (onStreamEventAsync != null && streamedAnyPartialState) await onStreamEventAsync(new ToolStreamEvent(ToolStreamEventKind.RetryReset, $"{phaseLabel}:retry")); transientRetries++; if (runState != null) runState.TransientLlmErrorRetries = transientRetries; var delayMs = _computeTransientBackoffDelayMs(transientRetries, ex); _emitEvent( AgentEventType.Thinking, "", $"{phaseLabel}: 일시적 LLM 오류로 {delayMs}ms 후 재시도합니다 ({transientRetries}/3)"); await Task.Delay(delayMs, ct); continue; } throw; } } } private void EmitWaitHeartbeat(string phaseLabel, TimeSpan waited, bool firstEventReceived) { var seconds = Math.Max(1, (int)Math.Round(waited.TotalSeconds)); var summary = firstEventReceived ? $"{phaseLabel}: 모델 응답이 길어져 계속 기다리는 중입니다... ({seconds}초)" : $"{phaseLabel}: 모델 첫 응답을 기다리는 중입니다... ({seconds}초)"; LogService.Debug($"[AgentLoopWait] {summary}"); _emitEvent(AgentEventType.Thinking, "", summary); } }