Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion builder/sizes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestBinarySize(t *testing.T) {
// microcontrollers
{"hifive1b", "examples/echo", 3680, 280, 0, 2252},
{"microbit", "examples/serial", 2694, 342, 8, 2248},
{"wioterminal", "examples/pininterrupt", 7074, 1510, 120, 7248},
{"wioterminal", "examples/pininterrupt", 7184, 1508, 120, 7256},

// TODO: also check wasm. Right now this is difficult, because
// wasm binaries are run through wasm-opt and therefore the
Expand Down
76 changes: 57 additions & 19 deletions src/machine/usb/cdc/usbcdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,23 @@ type cdcLineInfo struct {

// USBCDC is the USB CDC aka serial over USB interface.
type USBCDC struct {
tx ring512
rx ring512
tx ring512
rx ring512

// inflight is the number of bytes currently submitted to the USB IN endpoint.
inflight atomic.Uint32
rbuf [1]byte
wbuf [1]byte

// txActive is the TX-pump ownership flag: 0 = idle, 1 = a pump owns the TX
// path. Claimed once (kickTx, CAS 0->1), held across every in-flight packet
// and the TX-complete IRQ, and released only when the ring drains. While it
// is set, kickTx's CAS fails and no second pump starts, which serializes the
// pump against Write across cores. Same model as Linux NAPI_STATE_SCHED:
// held across completion, dropped only with a recheck
// (Documentation/networking/napi.rst).
txActive atomic.Uint32

rbuf [1]byte
wbuf [1]byte
}

var (
Expand Down Expand Up @@ -81,7 +93,7 @@ func (usbcdc *USBCDC) Configure(config machine.UARTConfig) error {

// Flush flushes buffered data.
func (usbcdc *USBCDC) Flush() {
for usbcdc.tx.Used() > 0 {
for usbcdc.tx.Used() > 0 || usbcdc.txActive.Load() != 0 {
gosched()
}
}
Expand All @@ -105,33 +117,59 @@ func (usbcdc *USBCDC) Write(data []byte) (n int, err error) {
return n, nil
}

// kickTx starts a transfer if none is in flight. Called from main context only.
// kickTx claims the TX pump for a producer. This CAS is the only start-from-idle
// edge; if it fails, a pump already owns the path and will drain what we just
// enqueued -- see the recheck in sendFromRing.
func (usbcdc *USBCDC) kickTx() {
if usbcdc.inflight.Load() > 0 {
return // txhandler will chain the next packet.
if !usbcdc.txActive.CompareAndSwap(0, 1) {
return
}
usbcdc.sendFromRing()
}

func (usbcdc *USBCDC) txhandler() {
// TX-complete IRQ. The pump is still owned here (txActive stayed 1 across the
// in-flight packet), so continue WITHOUT re-claiming -- pairs with the CAS in
// kickTx. A CAS here would see the flag already set, bail, and stall the chain.
inflight := usbcdc.inflight.Load()
usbcdc.inflight.Store(0)
if inflight == 0 {
return
}
usbcdc.tx.Discard(inflight)
usbcdc.inflight.Store(0)
usbcdc.sendFromRing()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is odd to me this is not guarded with a txActive compare-swap. I think my issue with this PR as it stands is it is not clear to me what txActive is guarding. From the looks of it sendFromRing can be called concurrently from kickTx and txhandler and we've added a bunch of logic in sendFromRing to deal with this. I feel like we can maybe keep sendFromRing as a simple function that does what it says and guard it from executing concurrently as I suspect that concurrent execution is not necessary for this to work correctly.

maybe we can even delete kickTx and simply add the guard at the start of sendFromRing and do a simple Store(0) immediately when sendFromRing ends it's process?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds very reasonable, and less complex. What do you think @rdon-key?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

txActive is not only guarding the lexical execution of sendFromRing. It represents ownership of the asynchronous TX pump.

This is the usual queue + active-flag missed-wakeup pattern: while the active flag is set, producers may enqueue more work but do not start another worker. Therefore, when the current owner is about to go idle, it must clear the active flag and then re-check the queue before returning.

In this case, sendFromRing submits one USB IN packet and then returns, but the TX pump must still be considered active while that packet is in flight. The ownership is continued by txhandler when the TX completion arrives.

That is why txhandler does not acquire txActive with CAS before calling sendFromRing: it is continuing the ownership that was acquired by kickTx.

If we put the CAS at the start of sendFromRing, the completion path would normally see txActive already set and fail to continue the pump. If we clear txActive immediately when sendFromRing returns after submitting a packet, Write could acquire it and call SendUSBInPacket again while the previous packet is still in flight, which is the race this PR is trying to avoid.

The intended ownership model:

  • kickTx acquires txActive when the TX pump is idle.
  • sendFromRing sends one packet while ownership is held.
  • txhandler continues the same ownership across TX completions.
  • ownership is released only when the ring appears empty, with a final re-check to avoid a missed wakeup.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a really hard time working through the logic. I've asked google AI to create an abstraction taking into account your approach. I do like the idea of defining a type that encapsulates the task pump functionality. Here's what the AI came up with. Let me know if any of these abstractions work. Note there are two abstractions provided, one uses a dynamic call, the other embeds the task pump logic more cleanly (second one). Let me know if this sounds like a good path forward.

https://share.google/aimode/CPm4rcMLZXj16sTsA

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @soypat — I did look at both sketches.

The main issue is that they don't capture the TX pump's IRQ boundary. The TX pump isn't a run-to-completion loop: it sends one packet, then has to suspend while keeping ownership, and resume from the TX-completion IRQ (txhandler) without re-acquiring.

As written, both sketches seem to collapse that boundary: they put Kick()/CAS in txhandler, so the pump can stall after the first packet while txActive is still held. To make the abstraction actually span the IRQ boundary, it would need a separate continuation entry that skips the CAS, an explicit yield / in-flight state, and the release-with-recheck path.

At that point, I don't think the abstraction is simpler than what's here now — it just spreads the same state machine across a generic type and the driver.

So for this PR I'd like to keep the scope minimal. It's a regression fix for #5377, and I'd rather land the smallest change that closes the race. If we want to revisit a TaskPump-style abstraction, I think that should be a separate follow-up issue with its own review and on-hardware testing.

}

// sendFromRing sends one USB packet from the ring and sets inflight.
// Called from kickTx (main) or txhandler (ISR), but never concurrently
// because kickTx only runs when inflight==0 and txhandler only runs
// when inflight>0.
// sendFromRing runs one step of the TX pump: submit one IN packet, or release the
// pump if the ring is empty. Precondition: txActive == 1 (from kickTx's CAS, or
// still held from the previous packet when entered via txhandler).
func (usbcdc *USBCDC) sendFromRing() {
d1, _ := usbcdc.tx.Peek()
if len(d1) == 0 {
return
for {
d1, _ := usbcdc.tx.Peek()
if len(d1) == 0 {
// Release the pump, then re-scan the ring: closes the missed-wakeup
// race where Write Put()s data and kickTx's CAS then fails (txActive
// still set), leaving the data for this pump to drain. The Store(0)
// is ordered before the Used() load -- and, in the producer, Put()
// before its CAS -- by the sequential consistency of Go's atomics, so
// neither side misses the other (assumes the ring's accesses are
// atomic too). cf. napi_complete_done() clearing NAPI_STATE_SCHED
// then rechecking.
usbcdc.txActive.Store(0)
if usbcdc.tx.Used() == 0 {
return // ring empty and pump released; done
}
if !usbcdc.txActive.CompareAndSwap(0, 1) {
return // another producer re-claimed the pump; let it run
}
continue // re-claimed; re-peek and keep pumping
}

chunk := d1[:min(usb.EndpointPacketSize, len(d1))]
usbcdc.inflight.Store(uint32(len(chunk)))
machine.SendUSBInPacket(cdcEndpointIn, chunk)
return // in flight; txActive stays set, txhandler continues
}
chunk := d1[:min(usb.EndpointPacketSize, len(d1))]
usbcdc.inflight.Store(uint32(len(chunk)))
machine.SendUSBInPacket(cdcEndpointIn, chunk)
}

// WriteByte writes a byte of data to the USB CDC interface.
Expand Down
Loading