diff --git a/ai.go b/ai.go index 9fadb802..0f508c42 100644 --- a/ai.go +++ b/ai.go @@ -9340,21 +9340,12 @@ data_filter: return startNode, err } - streamResp, err := client.Do(streamReq) - if err != nil { - log.Printf("[ERROR] AI Agent: Failed sending request for stream during SKIPPED user input: %s", err) - return startNode, err + _, _, streamErr := DoRequestWithRetry(client, streamReq, 3) + if streamErr != nil { + log.Printf("[ERROR] AI Agent: Failed sending request for stream during SKIPPED user input: %s", streamErr) + return startNode, streamErr } - defer streamResp.Body.Close() - streamBody, err := ioutil.ReadAll(streamResp.Body) - if err != nil { - log.Printf("[ERROR] AI Agent: Failed reading response from sending request for stream during SKIPPED user input: %s", err) - return startNode, err - } - - log.Printf("[INFO] Response from sending request for stream during SKIPPED user input: %d - %s", streamResp.StatusCode, string(streamBody)) - return startNode, nil } diff --git a/cloudSync.go b/cloudSync.go index 3bd6163a..936cb9e9 100755 --- a/cloudSync.go +++ b/cloudSync.go @@ -2010,8 +2010,8 @@ func RunAgentDecisionSingulActionHandler(execution WorkflowExecution, decision A requestUrl := fmt.Sprintf("%s/api/v1/apps/categories/run?authorization=%s&execution_id=%s", baseUrl, execution.Authorization, execution.ExecutionId) // Change timeout to be 300 seconds (just in case) - // Allows for reruns and self-correcting - client := GetExternalClient(requestUrl) + // Allows for reruns and self-correcting. + client := GetExternalClientWithTimeout(requestUrl, 0) client.Timeout = 300 * time.Second newFields := []schemaless.Valuereplace{} @@ -2487,60 +2487,17 @@ func RunAgentDecisionAction(execution WorkflowExecution, agentOutput AgentOutput return } - const maxStreamRetries = 3 - var req *http.Request - var lastStreamErr error - serverReceivedRequest := false - - for attempt := 0; attempt < maxStreamRetries; attempt++ { - if attempt > 0 { - backoff := time.Duration(attempt*attempt) * time.Second - log.Printf("[WARNING][%s] AI Agent: Retrying streams POST for decision %s (attempt %d/%d) after %s: %v", execution.ExecutionId, decision.RunDetails.Id, attempt+1, maxStreamRetries, backoff, lastStreamErr) - time.Sleep(backoff) - } - - req, err = http.NewRequest( - "POST", - url, - bytes.NewBuffer(marshalledAction), - ) - if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed agent decision request creation on retry %d: %s", execution.ExecutionId, attempt+1, err) - lastStreamErr = err - continue - } - - req.Header.Set("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed sending agent decision result (attempt %d): %s", execution.ExecutionId, attempt+1, err) - lastStreamErr = err - continue - } - - serverReceivedRequest = true - - foundBody, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - log.Printf("[WARNING][%s] AI Agent: Decision %s POSTed to streams (status %d) but failed reading response body: %s. Treating as success.", execution.ExecutionId, decision.RunDetails.Id, resp.StatusCode, err) - return - } - - if resp.StatusCode == 200 { + streamReq, err := http.NewRequest("POST", url, bytes.NewBuffer(marshalledAction)) + if err != nil { + log.Printf("[ERROR][%s] AI Agent: Failed agent decision request creation: %s", execution.ExecutionId, err) + } else { + streamReq.Header.Set("Content-Type", "application/json") + _, _, streamErr := DoRequestWithRetry(client, streamReq, 3) + if streamErr == nil { return } - - log.Printf("[ERROR][%s] AI Agent: Status %d for decision %s (attempt %d). Body: %s", execution.ExecutionId, resp.StatusCode, decision.RunDetails.Id, attempt+1, string(foundBody)) - lastStreamErr = fmt.Errorf("streams POST returned status %d", resp.StatusCode) - - if resp.StatusCode >= 400 && resp.StatusCode < 500 { - break - } - + log.Printf("[ERROR][%s] AI Agent: All attempts to POST decision %s to streams failed: %v. Falling back to in-process handler.", execution.ExecutionId, decision.RunDetails.Id, streamErr) } - - log.Printf("[ERROR][%s] AI Agent: All %d attempts to POST decision %s to streams failed (serverReceived=%v). Last error: %v. Falling back to in-process handler.", execution.ExecutionId, maxStreamRetries, decision.RunDetails.Id, serverReceivedRequest, lastStreamErr) // Try the in-process handler to keep the agent moving when the streams API is unavailable. freshExec, err := GetWorkflowExecution(context.Background(), execution.ExecutionId) if err != nil { diff --git a/shared.go b/shared.go index d619bdf2..f695e32b 100644 --- a/shared.go +++ b/shared.go @@ -17594,6 +17594,7 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int) var requestBody []byte var err error + if req.GetBody == nil && req.Body != nil { requestBody, err = io.ReadAll(req.Body) if err != nil { @@ -17605,12 +17606,14 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int) } var lastErr error + for attempt := 0; attempt < maxAttempts; attempt++ { if attempt > 0 { time.Sleep(time.Duration(attempt) * 2 * time.Second) } clonedReq := req.Clone(req.Context()) + if req.GetBody != nil { clonedReq.Body, err = req.GetBody() if err != nil { @@ -17622,16 +17625,19 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int) } resp, err := client.Do(clonedReq) + if err != nil { - if resp != nil { - if resp.Body != nil { - resp.Body.Close() - } + if resp != nil && resp.Body != nil { + resp.Body.Close() + } - return resp, nil, err + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return nil, nil, err } lastErr = err + if attempt+1 < maxAttempts { continue } @@ -17641,11 +17647,23 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int) body, readErr := io.ReadAll(resp.Body) resp.Body.Close() + if readErr != nil { - return resp, nil, readErr + return nil, nil, readErr } resp.Body = io.NopCloser(bytes.NewReader(body)) + + if resp.StatusCode == 429 { + lastErr = fmt.Errorf("rate limited: %s", string(body)) + + if attempt+1 < maxAttempts { + continue + } + + return resp, body, lastErr + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { return resp, body, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) } @@ -17762,44 +17780,13 @@ func sendAgentActionSelfRequest(status string, workflowExecution WorkflowExecuti fullUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl) client := &http.Client{} - var streamErr error - var body []byte - for attempt := 0; attempt < 3; attempt++ { - if attempt > 0 { - time.Sleep(time.Duration(attempt) * 2 * time.Second) - } - - req, err := http.NewRequest("POST", fullUrl, bytes.NewBuffer(marshalledResult)) - if err != nil { - log.Printf("[ERROR][%s] Error building agent '%s' request: %s", workflowExecution.ExecutionId, status, err) - return err - } - - resp, err := client.Do(req) - if err != nil { - log.Printf("[WARNING][%s] Error on agent '%s' request (attempt %d/3): %s", workflowExecution.ExecutionId, status, attempt+1, err) - streamErr = err - continue - } - - body, err = ioutil.ReadAll(resp.Body) - resp.Body.Close() // no defer since we need to close it before the next attempt : ) - if err != nil { - log.Printf("[WARNING][%s] Failed reading agent '%s' body (attempt %d/3): %s", workflowExecution.ExecutionId, status, attempt+1, err) - streamErr = err - continue - } - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - log.Printf("[WARNING][%s] Non-2xx (%d) from /api/v1/streams for agent '%s' (attempt %d/3): %s", workflowExecution.ExecutionId, resp.StatusCode, status, attempt+1, string(body)) - streamErr = errors.New(fmt.Sprintf("No result in %s request for agent", status)) - continue - } - - streamErr = nil - break + streamReq, err := http.NewRequest("POST", fullUrl, bytes.NewBuffer(marshalledResult)) + if err != nil { + log.Printf("[ERROR][%s] Error building agent '%s' request: %s", workflowExecution.ExecutionId, status, err) + return err } + _, _, streamErr := DoRequestWithRetry(client, streamReq, 3) if streamErr != nil { log.Printf("[ERROR][%s] Failed sending self-request with '%s' for agent after 3 attempts: %s", workflowExecution.ExecutionId, status, streamErr) return streamErr @@ -32301,6 +32288,14 @@ func GetExternalClient(baseUrl string) *http.Client { return client } +func GetExternalClientWithTimeout(baseUrl string, responseHeaderTimeout time.Duration) *http.Client { + client := GetExternalClient(baseUrl) + if t, ok := client.Transport.(*http.Transport); ok { + t.ResponseHeaderTimeout = responseHeaderTimeout + } + return client +} + // Function with the name RemoveFromArray to remove a string from a string array func RemoveFromArray(array []string, element string) []string { for i, v := range array {