Skip to content

Commit 4563f3b

Browse files
committed
Fixed an algorithm bug with subflows following the new Agentic path, which stops executions
1 parent 57daff2 commit 4563f3b

File tree

2 files changed

+37
-8
lines changed

2 files changed

+37
-8
lines changed

db-connector.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1739,10 +1739,9 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
17391739
result = innerresult
17401740
break
17411741

1742-
} else if innerresult.Status == "WAITING" || innerresult.Status == "SUCCESS" && (action.AppName == "AI Agent" || action.AppName == "Shuffle Agent") {
1742+
//} else if innerresult.Status == "WAITING" || innerresult.Status == "SUCCESS" && (action.AppName == "AI Agent" || action.AppName == "Shuffle Agent") {
1743+
} else if (innerresult.Status == "WAITING" || innerresult.Status == "SUCCESS") && (innerresult.Action.AppName == "AI Agent" || innerresult.Action.AppName == "Shuffle Agent") {
17431744
// Auto fixing decision data based on cache for better decisionmaking
1744-
//log.Printf("[DEBUG] Found action result %s with WAITING status", action.AppName)
1745-
17461745
// Map the result into AgentOutput to check decisions
17471746
mappedOutput := AgentOutput{}
17481747
err = json.Unmarshal([]byte(innerresult.Result), &mappedOutput)

shared.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9322,6 +9322,24 @@ func HandleGetUsers(resp http.ResponseWriter, request *http.Request) {
93229322
continue
93239323
}
93249324

9325+
orgUser.ApiKey = ""
9326+
orgUser.Password = ""
9327+
orgUser.Session = ""
9328+
orgUser.UsersLastSession = ""
9329+
orgUser.VerificationToken = ""
9330+
orgUser.ValidatedSessionOrgs = []string{}
9331+
orgUser.Orgs = []string{}
9332+
orgUser.Authentication = []UserAuth{}
9333+
orgUser.PrivateApps = []WorkflowApp{}
9334+
orgUser.MFA = MFAInfo{
9335+
Active: orgUser.MFA.Active,
9336+
}
9337+
9338+
orgUser.ActiveOrg = OrgMini{}
9339+
if !orgUser.SupportAccess {
9340+
orgUser.LoginInfo = []LoginInfo{}
9341+
}
9342+
93259343
//orgUser.Deleted = true
93269344
orgUser.LoginType = "DELETED"
93279345
orgUser.Role = "user"
@@ -9341,7 +9359,6 @@ func HandleGetUsers(resp http.ResponseWriter, request *http.Request) {
93419359
}
93429360

93439361
if !found {
9344-
//log.Printf("[DEBUG] Adding user %s (%s) to list", item.Username, item.Id)
93459362
deduplicatedUsers = append(deduplicatedUsers, item)
93469363
}
93479364
}
@@ -11698,6 +11715,7 @@ func HandleChangeUserOrg(resp http.ResponseWriter, request *http.Request) {
1169811715
found := false
1169911716
for _, orgId := range user.Orgs {
1170011717
if orgId == org.Id {
11718+
usr.Role = "user"
1170111719
found = true
1170211720
break
1170311721
}
@@ -15741,7 +15759,7 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa
1574115759
newCacheKey := fmt.Sprintf("%s_%s_sinkholed_result", executionParent, parentNode)
1574215760
cacheData, err := GetCache(ctx, newCacheKey)
1574315761
if err != nil {
15744-
log.Printf("[ERROR] Failed Getting sinkholed cache for subflow action result %s (4): %s", subflowExecutionId, err)
15762+
log.Printf("[ERROR][%s] Failed Getting sinkholed cache for subflow action result (4): %s", subflowExecutionId, err)
1574515763
} else {
1574615764

1574715765
mappedData, ok := cacheData.([]byte)
@@ -15750,6 +15768,9 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa
1575015768
err = json.Unmarshal(mappedData, &actionValue)
1575115769
if err != nil {
1575215770
log.Printf("[ERROR] Failed unmarshalling cache for subflow action result %s (4): %s", subflowExecutionId, err)
15771+
if debug {
15772+
log.Printf("\n\nSinkholed result DATA: %s\n\n", string(mappedData))
15773+
}
1575315774
}
1575415775
} else {
1575515776
log.Printf("[ERROR] Failed type assertion for subflow action result %s (4): %s", subflowExecutionId, err)
@@ -17562,9 +17583,13 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
1756217583
var subflowDataList []SubflowData
1756317584
err = json.Unmarshal([]byte(actionResult.Result), &subflowDataList)
1756417585

17586+
//if debug {
17587+
// log.Printf("\n\n\n\n\nSUBFLOW RESULT DATA: %#v\n\n\n\n\n", subflowData)
17588+
//}
17589+
1756517590
// This is in case the list is not an actual list
1756617591
if err != nil || len(subflowDataList) == 0 {
17567-
log.Printf("NOT sinkholed from subflow result: %s", err)
17592+
log.Printf("[WARNING][%s] NOT sinkholed from subflow result: %s", workflowExecution.ExecutionId, err)
1756817593
for resultIndex, result := range workflowExecution.Results {
1756917594
if result.Action.ID == actionResult.Action.ID {
1757017595
workflowExecution.Results[resultIndex] = actionResult
@@ -21779,7 +21804,12 @@ func ValidateNewWorkerExecution(ctx context.Context, body []byte, shouldReset bo
2177921804
if err != nil {
2178021805
log.Printf("[WARNING] Failed execution unmarshaling: %s", err)
2178121806
if strings.Contains(fmt.Sprintf("%s", err), "array into") {
21782-
log.Printf("Array unmarshal error: %s", string(body))
21807+
parsedBody := string(body)
21808+
if len(parsedBody) > 500 {
21809+
parsedBody = parsedBody[0:500] + "..."
21810+
}
21811+
21812+
log.Printf("[ERROR] Array unmarshal error in validate new worker execution: %s", parsedBody)
2178321813
}
2178421814

2178521815
return err
@@ -26236,7 +26266,7 @@ func executeAuthgroupSubflow(workflowExecution WorkflowExecution, authgroup AppA
2623626266
// 2. Parent workflow's owner is same org?
2623726267
// 3. Parent execution auth is correct
2623826268
func RunExecuteAccessValidation(request *http.Request, workflow *Workflow) (bool, string) {
26239-
if debug == true {
26269+
if debug {
2624026270
log.Printf("[DEBUG] Inside execute validation for workflow %s (%s)! Request method: %s. Queries: %#v", workflow.Name, workflow.ID, request.Method, request.URL.Query())
2624126271
}
2624226272

0 commit comments

Comments
 (0)