diff --git a/streaming.go b/streaming.go index 2eb8e522..b5a7f4bc 100644 --- a/streaming.go +++ b/streaming.go @@ -302,6 +302,46 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) { var lastSentSeq int64 = sinceSeq var pollCount int + // On initial connect (since=0), flush the delta ops since the last save so late joiners + // catch up to unsaved changes made by other users before they arrived. + if sinceSeq == 0 { + cache, err := GetCache(ctx, sessionKey) + if err == nil { + cacheData, ok := cache.([]uint8) + if ok { + var state StreamWorkflowState + if err := json.Unmarshal(cacheData, &state); err == nil && len(state.Operations) > 0 { + // Find the sequence of the last save op — that's the catch-up baseline + var lastSaveSeq int64 + for _, op := range state.Operations { + if op.Item == "workflow" && op.Type == "save" { + lastSaveSeq = op.Sequence + } + } + + for _, op := range state.Operations { + if op.Sequence <= lastSaveSeq { + continue + } + + if op.Type == "select" || op.Type == "unselect" || op.Type == "hover" || op.Type == "enter" { + continue + } + opBytes, err := json.Marshal(op) + if err != nil { + continue + } + fmt.Fprintf(resp, "%s\n", string(opBytes)) + lastSentSeq = op.Sequence + } + } + } + } + + fmt.Fprintf(resp, "%s\n", `{"item":"system","type":"init_complete"}`) + conn.Flush() + } + for { pollCount++ if pollCount%streamPresenceInterval == 1 {