Skip to content
This repository was archived by the owner on Jul 18, 2025. It is now read-only.

Commit a9af616

Browse files
committed
Merge pull request #54 from dmcgowan/channel_reference_id
Update channel passing to use reference ids instead of stream ids
2 parents 8acda1e + 4dbdacb commit a9af616

3 files changed

Lines changed: 191 additions & 23 deletions

File tree

spdy/encode.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,28 @@ import (
1010
"github.com/dmcgowan/go/codec"
1111
)
1212

13+
func decodeReferenceID(b []byte) (referenceID uint64, err error) {
14+
if len(b) == 8 {
15+
referenceID = binary.BigEndian.Uint64(b)
16+
} else if len(b) == 4 {
17+
referenceID = uint64(binary.BigEndian.Uint32(b))
18+
} else {
19+
err = errors.New("bad reference id")
20+
}
21+
return
22+
}
23+
24+
func encodeReferenceID(b []byte, referenceID uint64) (n int) {
25+
if referenceID > 0xffffffff {
26+
binary.BigEndian.PutUint64(b, referenceID)
27+
n = 8
28+
} else {
29+
binary.BigEndian.PutUint32(b, uint32(referenceID))
30+
n = 4
31+
}
32+
return
33+
}
34+
1335
func (s *Transport) encodeChannel(v reflect.Value) ([]byte, error) {
1436
rc := v.Interface().(channel)
1537
if rc.stream == nil {
@@ -20,19 +42,15 @@ func (s *Transport) encodeChannel(v reflect.Value) ([]byte, error) {
2042
}
2143

2244
// Get stream identifier?
23-
streamID := rc.stream.Identifier()
24-
var buf [9]byte
45+
buf := make([]byte, 9)
2546
if rc.direction == inbound {
2647
buf[0] = 0x02 // Reverse direction
2748
} else if rc.direction == outbound {
2849
buf[0] = 0x01 // Reverse direction
2950
} else {
3051
return nil, errors.New("invalid direction")
3152
}
32-
written := binary.PutUvarint(buf[1:], uint64(streamID))
33-
if written > 4 {
34-
return nil, errors.New("wrote unexpected stream id size")
35-
}
53+
written := encodeReferenceID(buf[1:], rc.referenceID)
3654
return buf[:(written + 1)], nil
3755
}
3856

@@ -46,18 +64,16 @@ func (s *Transport) decodeChannel(v reflect.Value, b []byte) error {
4664
} else {
4765
return errors.New("unexpected direction")
4866
}
49-
50-
streamID, readN := binary.Uvarint(b[1:])
51-
if readN > 4 {
52-
return errors.New("read unexpected stream id size")
67+
referenceID, err := decodeReferenceID(b[1:])
68+
if err != nil {
69+
return err
5370
}
54-
stream := s.conn.FindStream(uint32(streamID))
55-
if stream == nil {
56-
return errors.New("stream does not exist")
71+
72+
c := s.getChannel(referenceID)
73+
if c == nil {
74+
return errors.New("channel does not exist")
5775
}
58-
rc.session = s
59-
rc.stream = stream
60-
v.Set(reflect.ValueOf(rc))
76+
v.Set(reflect.ValueOf(*c))
6177

6278
return nil
6379
}
@@ -84,15 +100,15 @@ func (s *Transport) encodeStream(v reflect.Value) ([]byte, error) {
84100
return nil, errors.New("bad type")
85101
}
86102
var buf [8]byte
87-
written := binary.PutUvarint(buf[:], uint64(bs.referenceID))
103+
written := encodeReferenceID(buf[:], bs.referenceID)
88104

89105
return buf[:written], nil
90106
}
91107

92108
func (s *Transport) decodeStream(v reflect.Value, b []byte) error {
93-
referenceID, readN := binary.Uvarint(b)
94-
if readN == 0 {
95-
return errors.New("bad reference id")
109+
referenceID, err := decodeReferenceID(b)
110+
if err != nil {
111+
return err
96112
}
97113

98114
bs := s.getByteStream(referenceID)

spdy/session.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,9 +328,11 @@ func (c *channel) createSubChannel(direction direction) (libchan.Sender, libchan
328328
return nil, nil, streamErr
329329
}
330330
subChannel := &channel{
331-
stream: stream,
332-
session: c.session,
333-
direction: direction,
331+
referenceID: referenceID,
332+
parentID: c.referenceID,
333+
stream: stream,
334+
session: c.session,
335+
direction: direction,
334336
}
335337

336338
c.session.channelC.L.Lock()

spdy/session_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,156 @@ func TestWrappedByteStreams(t *testing.T) {
284284
SpawnClientServerTest(t, "localhost:12943", ClientSendWrapper(client), ServerReceiveWrapper(server))
285285
}
286286

287+
type ReceiverMessage struct {
288+
Message string
289+
Receiver libchan.Receiver
290+
}
291+
292+
func TestSubChannel(t *testing.T) {
293+
client := func(t *testing.T, sender libchan.Sender, s *Transport) {
294+
remote1, send1 := libchan.Pipe()
295+
m1 := &ReceiverMessage{
296+
Message: "WithReceiver",
297+
Receiver: remote1,
298+
}
299+
if sendErr := sender.Send(m1); sendErr != nil {
300+
t.Fatalf("Error sending ReceiverMessage: %s", sendErr)
301+
}
302+
303+
remote2, send2 := libchan.Pipe()
304+
m2 := &ReceiverMessage{
305+
Message: "Nested",
306+
Receiver: remote2,
307+
}
308+
if sendErr := send1.Send(m2); sendErr != nil {
309+
t.Fatalf("Error sending ReceiverMessage: %s", sendErr)
310+
}
311+
312+
m3 := &SimpleMessage{"This is a simple message"}
313+
if sendErr := send2.Send(m3); sendErr != nil {
314+
t.Fatalf("Error sending simple message: %s", sendErr)
315+
}
316+
317+
if closeErr := send1.Close(); closeErr != nil {
318+
t.Fatalf("Error closing send1: %s", closeErr)
319+
}
320+
321+
if closeErr := send2.Close(); closeErr != nil {
322+
t.Fatalf("Error closing send2: %s", closeErr)
323+
}
324+
}
325+
server := func(t *testing.T, receiver libchan.Receiver, s *Transport) {
326+
m1 := &ReceiverMessage{}
327+
if receiveErr := receiver.Receive(m1); receiveErr != nil {
328+
t.Fatalf("Error receiving ReceiverMessage: %s", receiveErr)
329+
}
330+
331+
if expected := "WithReceiver"; m1.Message != expected {
332+
t.Fatalf("Unexpected message\n\tExpected: %s\n\tActual: %s", expected, m1.Message)
333+
}
334+
335+
if m1.Receiver == nil {
336+
t.Fatalf("Receiver is nil")
337+
}
338+
339+
m2 := &ReceiverMessage{}
340+
if receiveErr := m1.Receiver.Receive(m2); receiveErr != nil {
341+
t.Fatalf("Error receiving ReceiverMessage: %s", receiveErr)
342+
}
343+
if expected := "Nested"; m2.Message != expected {
344+
t.Fatalf("Unexpected message value:\n\tExpected: %s\n\tActual: %s", expected, m2.Message)
345+
}
346+
347+
if m2.Receiver == nil {
348+
t.Fatalf("Receiver is nil")
349+
}
350+
351+
m3 := &SimpleMessage{}
352+
if receiverErr := m2.Receiver.Receive(m3); receiverErr != nil {
353+
t.Fatalf("Error receiving SimpleMessage: %s", receiverErr)
354+
}
355+
if expected := "This is a simple message"; m3.Message != expected {
356+
t.Fatalf("Unexpected message value:\n\tExpected: %s\n\tActual: %s", expected, m3.Message)
357+
}
358+
}
359+
SpawnClientServerTest(t, "localhost:12845", ClientSendWrapper(client), ServerReceiveWrapper(server))
360+
}
361+
362+
type SenderMessage struct {
363+
Message string
364+
Sender libchan.Sender
365+
}
366+
367+
func TestSenderSubChannel(t *testing.T) {
368+
client := func(t *testing.T, sender libchan.Sender, s *Transport) {
369+
recv, remote1 := libchan.Pipe()
370+
m1 := &SenderMessage{
371+
Message: "WithSender",
372+
Sender: remote1,
373+
}
374+
if sendErr := sender.Send(m1); sendErr != nil {
375+
t.Fatalf("Error sending SenderMessage: %s", sendErr)
376+
}
377+
378+
m2 := &SenderMessage{}
379+
if receiveErr := recv.Receive(m2); receiveErr != nil {
380+
t.Fatalf("Error receiving ReceiverMessage: %s", receiveErr)
381+
}
382+
if expected := "Nested"; m2.Message != expected {
383+
t.Fatalf("Unexpected message value:\n\tExpected: %s\n\tActual: %s", expected, m2.Message)
384+
}
385+
386+
if m2.Sender == nil {
387+
t.Fatalf("Receiver is nil")
388+
}
389+
390+
m3 := &SimpleMessage{"This is a simple message"}
391+
if sendErr := m2.Sender.Send(m3); sendErr != nil {
392+
t.Fatalf("Error sending simple message: %s", sendErr)
393+
}
394+
395+
if closeErr := m2.Sender.Close(); closeErr != nil {
396+
t.Fatalf("Error closing send2: %s", closeErr)
397+
}
398+
}
399+
server := func(t *testing.T, receiver libchan.Receiver, s *Transport) {
400+
m1 := &SenderMessage{}
401+
if receiveErr := receiver.Receive(m1); receiveErr != nil {
402+
t.Fatalf("Error receiving SenderMessage: %s", receiveErr)
403+
}
404+
405+
if expected := "WithSender"; m1.Message != expected {
406+
t.Fatalf("Unexpected message\n\tExpected: %s\n\tActual: %s", expected, m1.Message)
407+
}
408+
409+
if m1.Sender == nil {
410+
t.Fatalf("Receiver is nil")
411+
}
412+
413+
recv, remote := libchan.Pipe()
414+
m2 := &SenderMessage{
415+
Message: "Nested",
416+
Sender: remote,
417+
}
418+
if sendErr := m1.Sender.Send(m2); sendErr != nil {
419+
t.Fatalf("Error sending SenderMessage: %s", sendErr)
420+
}
421+
422+
m3 := &SimpleMessage{}
423+
if receiverErr := recv.Receive(m3); receiverErr != nil {
424+
t.Fatalf("Error receiving SimpleMessage: %s", receiverErr)
425+
}
426+
if expected := "This is a simple message"; m3.Message != expected {
427+
t.Fatalf("Unexpected message value:\n\tExpected: %s\n\tActual: %s", expected, m3.Message)
428+
}
429+
430+
if closeErr := m1.Sender.Close(); closeErr != nil {
431+
t.Fatalf("Error closing send1: %s", closeErr)
432+
}
433+
434+
}
435+
SpawnClientServerTest(t, "localhost:12845", ClientSendWrapper(client), ServerReceiveWrapper(server))
436+
}
287437
func ClientSendWrapper(f func(t *testing.T, c libchan.Sender, s *Transport)) ClientRoutine {
288438
return func(t *testing.T, server string) {
289439
conn, connErr := net.Dial("tcp", server)

0 commit comments

Comments
 (0)