Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -9340,21 +9340,12 @@ data_filter:
return startNode, err
}

streamResp, err := client.Do(streamReq)
if err != nil {
log.Printf("[ERROR] AI Agent: Failed sending request for stream during SKIPPED user input: %s", err)
return startNode, err
_, _, streamErr := DoRequestWithRetry(client, streamReq, 3)
if streamErr != nil {
log.Printf("[ERROR] AI Agent: Failed sending request for stream during SKIPPED user input: %s", streamErr)
return startNode, streamErr
}

defer streamResp.Body.Close()
streamBody, err := ioutil.ReadAll(streamResp.Body)
if err != nil {
log.Printf("[ERROR] AI Agent: Failed reading response from sending request for stream during SKIPPED user input: %s", err)
return startNode, err
}

log.Printf("[INFO] Response from sending request for stream during SKIPPED user input: %d - %s", streamResp.StatusCode, string(streamBody))

return startNode, nil

}
Expand Down
63 changes: 10 additions & 53 deletions cloudSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,8 +2010,8 @@ func RunAgentDecisionSingulActionHandler(execution WorkflowExecution, decision A
requestUrl := fmt.Sprintf("%s/api/v1/apps/categories/run?authorization=%s&execution_id=%s", baseUrl, execution.Authorization, execution.ExecutionId)

// Change timeout to be 300 seconds (just in case)
// Allows for reruns and self-correcting
client := GetExternalClient(requestUrl)
// Allows for reruns and self-correcting.
client := GetExternalClientWithTimeout(requestUrl, 0)
client.Timeout = 300 * time.Second

newFields := []schemaless.Valuereplace{}
Expand Down Expand Up @@ -2487,60 +2487,17 @@ func RunAgentDecisionAction(execution WorkflowExecution, agentOutput AgentOutput
return
}

const maxStreamRetries = 3
var req *http.Request
var lastStreamErr error
serverReceivedRequest := false

for attempt := 0; attempt < maxStreamRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(attempt*attempt) * time.Second
log.Printf("[WARNING][%s] AI Agent: Retrying streams POST for decision %s (attempt %d/%d) after %s: %v", execution.ExecutionId, decision.RunDetails.Id, attempt+1, maxStreamRetries, backoff, lastStreamErr)
time.Sleep(backoff)
}

req, err = http.NewRequest(
"POST",
url,
bytes.NewBuffer(marshalledAction),
)
if err != nil {
log.Printf("[ERROR][%s] AI Agent: Failed agent decision request creation on retry %d: %s", execution.ExecutionId, attempt+1, err)
lastStreamErr = err
continue
}

req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
log.Printf("[ERROR][%s] AI Agent: Failed sending agent decision result (attempt %d): %s", execution.ExecutionId, attempt+1, err)
lastStreamErr = err
continue
}

serverReceivedRequest = true

foundBody, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Printf("[WARNING][%s] AI Agent: Decision %s POSTed to streams (status %d) but failed reading response body: %s. Treating as success.", execution.ExecutionId, decision.RunDetails.Id, resp.StatusCode, err)
return
}

if resp.StatusCode == 200 {
streamReq, err := http.NewRequest("POST", url, bytes.NewBuffer(marshalledAction))
if err != nil {
log.Printf("[ERROR][%s] AI Agent: Failed agent decision request creation: %s", execution.ExecutionId, err)
} else {
streamReq.Header.Set("Content-Type", "application/json")
_, _, streamErr := DoRequestWithRetry(client, streamReq, 3)
if streamErr == nil {
return
}

log.Printf("[ERROR][%s] AI Agent: Status %d for decision %s (attempt %d). Body: %s", execution.ExecutionId, resp.StatusCode, decision.RunDetails.Id, attempt+1, string(foundBody))
lastStreamErr = fmt.Errorf("streams POST returned status %d", resp.StatusCode)

if resp.StatusCode >= 400 && resp.StatusCode < 500 {
break
}

log.Printf("[ERROR][%s] AI Agent: All attempts to POST decision %s to streams failed: %v. Falling back to in-process handler.", execution.ExecutionId, decision.RunDetails.Id, streamErr)
}

log.Printf("[ERROR][%s] AI Agent: All %d attempts to POST decision %s to streams failed (serverReceived=%v). Last error: %v. Falling back to in-process handler.", execution.ExecutionId, maxStreamRetries, decision.RunDetails.Id, serverReceivedRequest, lastStreamErr)
// Try the in-process handler to keep the agent moving when the streams API is unavailable.
freshExec, err := GetWorkflowExecution(context.Background(), execution.ExecutionId)
if err != nil {
Expand Down
79 changes: 37 additions & 42 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -17594,6 +17594,7 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int)

var requestBody []byte
var err error

if req.GetBody == nil && req.Body != nil {
requestBody, err = io.ReadAll(req.Body)
if err != nil {
Expand All @@ -17605,12 +17606,14 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int)
}

var lastErr error

for attempt := 0; attempt < maxAttempts; attempt++ {
if attempt > 0 {
time.Sleep(time.Duration(attempt) * 2 * time.Second)
}

clonedReq := req.Clone(req.Context())

if req.GetBody != nil {
clonedReq.Body, err = req.GetBody()
if err != nil {
Expand All @@ -17622,16 +17625,19 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int)
}

resp, err := client.Do(clonedReq)

if err != nil {
if resp != nil {
if resp.Body != nil {
resp.Body.Close()
}
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

return resp, nil, err
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return nil, nil, err
}

lastErr = err

if attempt+1 < maxAttempts {
continue
}
Expand All @@ -17641,11 +17647,23 @@ func DoRequestWithRetry(client *http.Client, req *http.Request, maxAttempts int)

body, readErr := io.ReadAll(resp.Body)
resp.Body.Close()

if readErr != nil {
return resp, nil, readErr
return nil, nil, readErr
}

resp.Body = io.NopCloser(bytes.NewReader(body))

if resp.StatusCode == 429 {
lastErr = fmt.Errorf("rate limited: %s", string(body))

if attempt+1 < maxAttempts {
continue
}

return resp, body, lastErr
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return resp, body, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))
}
Expand Down Expand Up @@ -17762,44 +17780,13 @@ func sendAgentActionSelfRequest(status string, workflowExecution WorkflowExecuti
fullUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
client := &http.Client{}

var streamErr error
var body []byte
for attempt := 0; attempt < 3; attempt++ {
if attempt > 0 {
time.Sleep(time.Duration(attempt) * 2 * time.Second)
}

req, err := http.NewRequest("POST", fullUrl, bytes.NewBuffer(marshalledResult))
if err != nil {
log.Printf("[ERROR][%s] Error building agent '%s' request: %s", workflowExecution.ExecutionId, status, err)
return err
}

resp, err := client.Do(req)
if err != nil {
log.Printf("[WARNING][%s] Error on agent '%s' request (attempt %d/3): %s", workflowExecution.ExecutionId, status, attempt+1, err)
streamErr = err
continue
}

body, err = ioutil.ReadAll(resp.Body)
resp.Body.Close() // no defer since we need to close it before the next attempt : )
if err != nil {
log.Printf("[WARNING][%s] Failed reading agent '%s' body (attempt %d/3): %s", workflowExecution.ExecutionId, status, attempt+1, err)
streamErr = err
continue
}

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
log.Printf("[WARNING][%s] Non-2xx (%d) from /api/v1/streams for agent '%s' (attempt %d/3): %s", workflowExecution.ExecutionId, resp.StatusCode, status, attempt+1, string(body))
streamErr = errors.New(fmt.Sprintf("No result in %s request for agent", status))
continue
}

streamErr = nil
break
streamReq, err := http.NewRequest("POST", fullUrl, bytes.NewBuffer(marshalledResult))
if err != nil {
log.Printf("[ERROR][%s] Error building agent '%s' request: %s", workflowExecution.ExecutionId, status, err)
return err
}

_, _, streamErr := DoRequestWithRetry(client, streamReq, 3)
if streamErr != nil {
log.Printf("[ERROR][%s] Failed sending self-request with '%s' for agent after 3 attempts: %s", workflowExecution.ExecutionId, status, streamErr)
return streamErr
Expand Down Expand Up @@ -32301,6 +32288,14 @@ func GetExternalClient(baseUrl string) *http.Client {
return client
}

func GetExternalClientWithTimeout(baseUrl string, responseHeaderTimeout time.Duration) *http.Client {
client := GetExternalClient(baseUrl)
if t, ok := client.Transport.(*http.Transport); ok {
t.ResponseHeaderTimeout = responseHeaderTimeout
}
return client
}

// Function with the name RemoveFromArray to remove a string from a string array
func RemoveFromArray(array []string, element string) []string {
for i, v := range array {
Expand Down
Loading