implement a StreamFrame Queue to handle high priority StreamFrames for retransmission

fixes #21
This commit is contained in:
Marten Seemann
2016-04-27 21:05:58 +07:00
parent c641e24e8c
commit 6a7f331269
4 changed files with 190 additions and 12 deletions

View File

@@ -2,7 +2,6 @@ package quic
import (
"bytes"
"sync"
"sync/atomic"
"github.com/lucas-clemente/quic-go/crypto"
@@ -22,22 +21,21 @@ type packetPacker struct {
connectionID protocol.ConnectionID
aead crypto.AEAD
queuedStreamFrames []frames.StreamFrame
mutex sync.Mutex
streamFrameQueue StreamFrameQueue
lastPacketNumber protocol.PacketNumber
}
func (p *packetPacker) AddStreamFrame(f frames.StreamFrame) {
p.mutex.Lock()
p.queuedStreamFrames = append(p.queuedStreamFrames, f)
p.mutex.Unlock()
p.streamFrameQueue.Push(&f, false)
}
func (p *packetPacker) AddHighPrioStreamFrame(f frames.StreamFrame) {
p.streamFrameQueue.Push(&f, true)
}
func (p *packetPacker) PackPacket(controlFrames []frames.Frame, includeStreamFrames bool) (*packedPacket, error) {
// TODO: save controlFrames as a member variable, makes it easier to handle in the unlikely event that there are more controlFrames than you can put into on packet
p.mutex.Lock()
defer p.mutex.Unlock() // TODO: Split up?
payloadFrames, err := p.composeNextPacket(controlFrames, includeStreamFrames)
if err != nil {
@@ -119,8 +117,8 @@ func (p *packetPacker) composeNextPacket(controlFrames []frames.Frame, includeSt
return payloadFrames, nil
}
for len(p.queuedStreamFrames) > 0 {
frame := &p.queuedStreamFrames[0]
for p.streamFrameQueue.Len() > 0 {
frame := p.streamFrameQueue.Front()
if payloadLength > protocol.MaxFrameSize {
panic("internal inconsistency: packet payload too large")
@@ -138,7 +136,7 @@ func (p *packetPacker) composeNextPacket(controlFrames []frames.Frame, includeSt
frame = previousFrame
payloadLength += len(previousFrame.Data) - 1
} else {
p.queuedStreamFrames = p.queuedStreamFrames[1:]
p.streamFrameQueue.Pop()
payloadLength += len(frame.Data) - 1
}

View File

@@ -262,7 +262,7 @@ func (s *Session) sendPacket() error {
controlFrames = append(controlFrames, retransmitPacket.GetControlFramesForRetransmission()...)
for _, streamFrame := range retransmitPacket.GetStreamFramesForRetransmission() {
// TODO: add these stream frames with a higher priority
s.packer.AddStreamFrame(*streamFrame)
s.packer.AddHighPrioStreamFrame(*streamFrame)
}
}

66
stream_frame_queue.go Normal file
View File

@@ -0,0 +1,66 @@
package quic
import (
"sync"
"github.com/lucas-clemente/quic-go/frames"
)
// StreamFrameQueue is a Queue that handles StreamFrames
type StreamFrameQueue struct {
prioFrames []*frames.StreamFrame
frames []*frames.StreamFrame
mutex sync.Mutex
}
// Push adds a new StreamFrame to the queue
func (q *StreamFrameQueue) Push(frame *frames.StreamFrame, prio bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if prio {
q.prioFrames = append(q.prioFrames, frame)
} else {
q.frames = append(q.frames, frame)
}
}
// Len returns the total number of queued StreamFrames
func (q *StreamFrameQueue) Len() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return len(q.prioFrames) + len(q.frames)
}
// Pop returns the next element and deletes it from the queue
func (q *StreamFrameQueue) Pop() *frames.StreamFrame {
q.mutex.Lock()
defer q.mutex.Unlock()
if len(q.prioFrames) > 0 {
frame := q.prioFrames[0]
q.prioFrames = q.prioFrames[1:]
return frame
}
if len(q.frames) > 0 {
frame := q.frames[0]
q.frames = q.frames[1:]
return frame
}
return nil
}
// Front returns the next element without modifying the queue
func (q *StreamFrameQueue) Front() *frames.StreamFrame {
q.mutex.Lock()
defer q.mutex.Unlock()
if len(q.prioFrames) > 0 {
return q.prioFrames[0]
}
if len(q.frames) > 0 {
return q.frames[0]
}
return nil
}

114
stream_frame_queue_test.go Normal file
View File

@@ -0,0 +1,114 @@
package quic
import (
"github.com/lucas-clemente/quic-go/frames"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("StreamFrameQueue", func() {
var prioFrame1, prioFrame2 *frames.StreamFrame
var frame1, frame2 *frames.StreamFrame
var queue StreamFrameQueue
BeforeEach(func() {
queue = StreamFrameQueue{}
prioFrame1 = &frames.StreamFrame{
StreamID: 5,
Data: []byte{0x13, 0x37},
}
prioFrame2 = &frames.StreamFrame{
StreamID: 6,
Data: []byte{0xDE, 0xCA, 0xFB, 0xAD},
}
frame1 = &frames.StreamFrame{
StreamID: 10,
Data: []byte{0xCA, 0xFE},
}
frame2 = &frames.StreamFrame{
StreamID: 11,
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
}
})
Context("Queue Length", func() {
It("returns the correct length for an empty queue", func() {
Expect(queue.Len()).To(BeZero())
})
It("returns the correct lengths for a queue", func() {
queue.Push(prioFrame1, true)
Expect(queue.Len()).To(Equal(1))
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.Len()).To(Equal(3))
})
It("returns the correct length when popping", func() {
queue.Push(prioFrame1, true)
queue.Push(prioFrame2, true)
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.Len()).To(Equal(4))
queue.Pop()
Expect(queue.Len()).To(Equal(3))
queue.Pop()
queue.Pop()
queue.Pop()
Expect(queue.Len()).To(Equal(0))
})
})
Context("Popping", func() {
It("returns nil when popping an empty queue", func() {
Expect(queue.Pop()).To(BeNil())
})
It("deletes elements once they are popped", func() {
queue.Push(frame1, false)
Expect(queue.Pop()).To(Equal(frame1))
Expect(queue.Pop()).To(BeNil())
})
It("returns normal frames if no prio frames are available", func() {
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.Pop()).To(Equal(frame1))
Expect(queue.Pop()).To(Equal(frame2))
})
It("returns prio frames first", func() {
queue.Push(prioFrame1, true)
queue.Push(frame1, false)
queue.Push(frame2, false)
queue.Push(prioFrame2, true)
Expect(queue.Pop()).To(Equal(prioFrame1))
Expect(queue.Pop()).To(Equal(prioFrame2))
Expect(queue.Pop()).To(Equal(frame1))
})
})
Context("Front", func() {
It("returns nil for an empty queue", func() {
Expect(queue.Front()).To(BeNil())
})
It("returns normal frames if no prio frames are available", func() {
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.Front()).To(Equal(frame1))
Expect(queue.Len()).To(Equal(2))
Expect(queue.Front()).To(Equal(frame1))
Expect(queue.Len()).To(Equal(2))
})
It("returns prio frames first", func() {
queue.Push(prioFrame1, true)
queue.Push(frame1, false)
queue.Push(frame2, false)
queue.Push(prioFrame2, true)
Expect(queue.Front()).To(Equal(prioFrame1))
Expect(queue.Len()).To(Equal(4))
})
})
})