From a601696030edf8074a8195f459e9bac3605c9c52 Mon Sep 17 00:00:00 2001 From: monilprajapati Date: Wed, 3 Jun 2026 11:38:22 +0530 Subject: [PATCH 1/2] Stream the changes after the last saved state for new joinee --- streaming.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/streaming.go b/streaming.go index 2eb8e522..42e50f03 100644 --- a/streaming.go +++ b/streaming.go @@ -302,6 +302,47 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) { var lastSentSeq int64 = sinceSeq var pollCount int + // On initial connect (since=0), flush the delta ops since the last save so late joiners + // catch up to unsaved changes made by other users before they arrived. + if sinceSeq == 0 { + cache, err := GetCache(ctx, sessionKey) + if err == nil { + cacheData, ok := cache.([]uint8) + if ok { + var state StreamWorkflowState + if err := json.Unmarshal(cacheData, &state); err == nil && len(state.Operations) > 0 { + // Find the sequence of the last save op — that's the catch-up baseline + var lastSaveSeq int64 + for _, op := range state.Operations { + if op.Item == "workflow" && op.Type == "save" { + lastSaveSeq = op.Sequence + } + } + + for _, op := range state.Operations { + if op.Sequence <= lastSaveSeq { + continue + } + // Skip ephemeral ops — they reflect transient UI state, not structural changes + if op.Type == "select" || op.Type == "unselect" || op.Type == "hover" || op.Type == "enter" { + continue + } + opBytes, err := json.Marshal(op) + if err != nil { + continue + } + fmt.Fprintf(resp, "%s\n", string(opBytes)) + lastSentSeq = op.Sequence + } + } + } + } + + // Signal to the client that catch-up is done — it can now arm its time filter + fmt.Fprintf(resp, "%s\n", `{"item":"system","type":"init_complete"}`) + conn.Flush() + } + for { pollCount++ if pollCount%streamPresenceInterval == 1 { From efb9edb57700ce69e3ba6114482f8d761ff52de5 Mon Sep 17 00:00:00 2001 From: monilprajapati Date: Wed, 3 Jun 2026 12:02:20 +0530 Subject: [PATCH 2/2] cleanup --- streaming.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streaming.go b/streaming.go index 42e50f03..b5a7f4bc 100644 --- a/streaming.go +++ b/streaming.go @@ -323,7 +323,7 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) { if op.Sequence <= lastSaveSeq { continue } - // Skip ephemeral ops — they reflect transient UI state, not structural changes + if op.Type == "select" || op.Type == "unselect" || op.Type == "hover" || op.Type == "enter" { continue } @@ -338,7 +338,6 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) { } } - // Signal to the client that catch-up is done — it can now arm its time filter fmt.Fprintf(resp, "%s\n", `{"item":"system","type":"init_complete"}`) conn.Flush() }