move splitting of StreamFrames to StreamFrameQueue

work towards #146
This commit is contained in:
Marten Seemann
2016-05-26 11:14:38 +07:00
parent 93771708ae
commit 758334f6aa
5 changed files with 170 additions and 125 deletions

View File

@@ -199,24 +199,3 @@ func (f *StreamFrame) MinLength() (protocol.ByteCount, error) {
return length + 1, nil
}
// MaybeSplitOffFrame removes the first n bytes and returns them as a separate frame. If n >= len(n), nil is returned and nothing is modified.
func (f *StreamFrame) MaybeSplitOffFrame(n protocol.ByteCount) *StreamFrame {
minLength, _ := f.MinLength() // StreamFrame.MinLength *never* errors
if n >= minLength-1+protocol.ByteCount(len(f.Data)) {
return nil
}
n -= minLength - 1
defer func() {
f.Data = f.Data[n:]
f.Offset += n
}()
return &StreamFrame{
FinBit: false,
StreamID: f.StreamID,
Offset: f.Offset,
Data: f.Data[:n],
}
}

View File

@@ -371,36 +371,4 @@ var _ = Describe("StreamFrame", func() {
Expect(f.getOffsetLength()).To(Equal(protocol.ByteCount(8)))
})
})
Context("splitting off earlier stream frames", func() {
It("splits off nothing", func() {
f := &StreamFrame{
StreamID: 1,
Data: []byte("bar"),
Offset: 3,
}
Expect(f.MaybeSplitOffFrame(1000)).To(BeNil())
Expect(f.Offset).To(Equal(protocol.ByteCount(3)))
})
It("splits off initial frame", func() {
f := &StreamFrame{
StreamID: 1,
Data: []byte("foobar"),
Offset: 3,
FinBit: true,
}
minLength, _ := f.MinLength()
previous := f.MaybeSplitOffFrame(minLength - 1 + 3)
Expect(previous).ToNot(BeNil())
Expect(previous.StreamID).To(Equal(protocol.StreamID(1)))
Expect(previous.Data).To(Equal([]byte("foo")))
Expect(previous.Offset).To(Equal(protocol.ByteCount(3)))
Expect(previous.FinBit).To(BeFalse())
Expect(f.StreamID).To(Equal(protocol.StreamID(1)))
Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.Offset).To(Equal(protocol.ByteCount(6)))
Expect(f.FinBit).To(BeTrue())
})
})
})

View File

@@ -197,34 +197,23 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra
maxFrameSize += 2
for p.streamFrameQueue.Len() > 0 {
frame := p.streamFrameQueue.Front()
frame.DataLenPresent = true // set the dataLen by default. Remove them later if applicable
if payloadLength > maxFrameSize {
return nil, errors.New("PacketPacker BUG: packet payload too large")
}
// Does the frame fit into the remaining space?
frameMinLength, _ := frame.MinLength() // StreamFrame.MinLength *never* returns an error
if payloadLength+frameMinLength > maxFrameSize {
frame := p.streamFrameQueue.Pop(maxFrameSize - payloadLength)
if frame == nil {
break
}
frame.DataLenPresent = true // set the dataLen by default. Remove them later if applicable
// Split stream frames if necessary
previousFrame := frame.MaybeSplitOffFrame(maxFrameSize - payloadLength)
if previousFrame != nil {
// Don't pop the queue, leave the modified frame in
frame = previousFrame
payloadLength += protocol.ByteCount(len(previousFrame.Data)) - 1
} else {
p.streamFrameQueue.Pop()
payloadLength += protocol.ByteCount(len(frame.Data)) - 1
}
frameMinLength, _ := frame.MinLength() // StreamFrame.MinLength *never* returns an error
payloadLength += frameMinLength - 1 + protocol.ByteCount(len(frame.Data))
blockedFrame := p.blockedManager.GetBlockedFrame(frame.StreamID, frame.Offset+protocol.ByteCount(len(frame.Data)))
if blockedFrame != nil {
blockedLength, _ := blockedFrame.MinLength() // BlockedFrame.MinLength *never* returns an error
if payloadLength+frameMinLength+blockedLength <= maxFrameSize {
if payloadLength+blockedLength <= maxFrameSize {
payloadFrames = append(payloadFrames, blockedFrame)
payloadLength += blockedLength
} else {
@@ -232,7 +221,6 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra
}
}
payloadLength += frameMinLength
payloadFrames = append(payloadFrames, frame)
hasStreamFrames = true
}
@@ -252,7 +240,7 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra
// Empty returns true if no frames are queued
func (p *packetPacker) Empty() bool {
return p.streamFrameQueue.Front() == nil
return p.streamFrameQueue.ByteLen() == 0
}
func (p *packetPacker) StreamFrameQueueByteLen() protocol.ByteCount {

View File

@@ -11,7 +11,7 @@ import (
type streamFrameQueue struct {
prioFrames []*frames.StreamFrame
frames []*frames.StreamFrame
mutex sync.Mutex
mutex sync.RWMutex
}
// Push adds a new StreamFrame to the queue
@@ -19,6 +19,8 @@ func (q *streamFrameQueue) Push(frame *frames.StreamFrame, prio bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
frame.DataLenPresent = true
if prio {
q.prioFrames = append(q.prioFrames, frame)
} else {
@@ -28,16 +30,16 @@ func (q *streamFrameQueue) Push(frame *frames.StreamFrame, prio bool) {
// Len returns the total number of queued StreamFrames
func (q *streamFrameQueue) Len() int {
q.mutex.Lock()
defer q.mutex.Unlock()
q.mutex.RLock()
defer q.mutex.RUnlock()
return len(q.prioFrames) + len(q.frames)
}
// ByteLen returns the total number of bytes queued
func (q *streamFrameQueue) ByteLen() protocol.ByteCount {
q.mutex.Lock()
defer q.mutex.Unlock()
q.mutex.RLock()
defer q.mutex.RUnlock()
// TODO: improve performance
// This is a very unperformant implementation. However, the obvious solution of keeping track of the length on Push() and Pop() doesn't work, since the front frame can be split by the PacketPacker
@@ -53,33 +55,68 @@ func (q *streamFrameQueue) ByteLen() protocol.ByteCount {
}
// Pop returns the next element and deletes it from the queue
func (q *streamFrameQueue) Pop() *frames.StreamFrame {
func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) *frames.StreamFrame {
q.mutex.Lock()
defer q.mutex.Unlock()
if len(q.prioFrames) > 0 {
frame := q.prioFrames[0]
frame, isPrioFrame := q.front()
if frame == nil {
return nil
}
// Does the frame fit into the remaining space?
frameMinLength, _ := frame.MinLength() // StreamFrame.MinLength *never* returns an error
if frameMinLength > maxLength {
return nil
}
splitFrame := q.maybeSplitOffFrame(frame, maxLength)
if splitFrame != nil { // StreamFrame was split
return splitFrame
}
// StreamFrame was not split. Remove it from the appropriate queue
if isPrioFrame {
q.prioFrames = q.prioFrames[1:]
return frame
}
if len(q.frames) > 0 {
frame := q.frames[0]
} else {
q.frames = q.frames[1:]
return frame
}
return nil
return frame
}
// Front returns the next element without modifying the queue
func (q *streamFrameQueue) Front() *frames.StreamFrame {
q.mutex.Lock()
defer q.mutex.Unlock()
// front returns the next element without modifying the queue
// has to be called from a function that has already acquired the mutex
func (q *streamFrameQueue) front() (*frames.StreamFrame, bool) {
if len(q.prioFrames) > 0 {
return q.prioFrames[0]
return q.prioFrames[0], true
}
if len(q.frames) > 0 {
return q.frames[0]
return q.frames[0], false
}
return nil, false
}
// maybeSplitOffFrame removes the first n bytes and returns them as a separate frame. If n >= len(n), nil is returned and nothing is modified.
// has to be called from a function that has already acquired the mutex
func (q *streamFrameQueue) maybeSplitOffFrame(frame *frames.StreamFrame, n protocol.ByteCount) *frames.StreamFrame {
minLength, _ := frame.MinLength() // StreamFrame.MinLength *never* errors
if n >= minLength-1+protocol.ByteCount(len(frame.Data)) {
return nil
}
n -= minLength - 1
defer func() {
frame.Data = frame.Data[n:]
frame.Offset += n
}()
return &frames.StreamFrame{
FinBit: false,
StreamID: frame.StreamID,
Offset: frame.Offset,
Data: frame.Data[:n],
DataLenPresent: frame.DataLenPresent,
}
return nil
}

View File

@@ -24,7 +24,7 @@ var _ = Describe("streamFrameQueue", func() {
}
frame1 = &frames.StreamFrame{
StreamID: 10,
Data: []byte{0xCA, 0xFE, 0x13},
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0x13, 0x37},
}
frame2 = &frames.StreamFrame{
StreamID: 11,
@@ -51,19 +51,19 @@ var _ = Describe("streamFrameQueue", func() {
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.Len()).To(Equal(4))
queue.Pop()
queue.Pop(1000)
Expect(queue.Len()).To(Equal(3))
queue.Pop()
queue.Pop()
queue.Pop()
queue.Pop(1000)
queue.Pop(1000)
queue.Pop(1000)
Expect(queue.Len()).To(Equal(0))
})
It("does not change the length when using Front()", func() {
It("does not change the length when using front()", func() {
queue.Push(prioFrame1, true)
queue.Push(frame1, false)
Expect(queue.Len()).To(Equal(2))
queue.Front()
queue.front()
Expect(queue.Len()).To(Equal(2))
})
})
@@ -78,48 +78,45 @@ var _ = Describe("streamFrameQueue", func() {
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2)))
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 3 + 5)))
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(len(prioFrame1.Data) + len(frame1.Data) + len(frame2.Data))))
})
It("returns the correct byte length when popping", func() {
queue.Push(prioFrame1, true)
queue.Push(prioFrame2, true)
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 4 + 3 + 5)))
queue.Pop()
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(4 + 3 + 5)))
queue.Pop()
queue.Pop()
queue.Pop()
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(len(prioFrame1.Data) + len(frame1.Data))))
queue.Pop(1000)
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(len(frame1.Data))))
queue.Pop(1000)
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(0)))
})
It("does not change the byte length when using Front()", func() {
queue.Push(prioFrame1, true)
queue.Push(frame1, false)
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 3)))
queue.Front()
Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 3)))
length := protocol.ByteCount(len(prioFrame1.Data) + len(frame1.Data))
Expect(queue.ByteLen()).To(Equal(length))
queue.front()
Expect(queue.ByteLen()).To(Equal(length))
})
})
Context("Popping", func() {
It("returns nil when popping an empty queue", func() {
Expect(queue.Pop()).To(BeNil())
Expect(queue.Pop(1000)).To(BeNil())
})
It("deletes elements once they are popped", func() {
queue.Push(frame1, false)
Expect(queue.Pop()).To(Equal(frame1))
Expect(queue.Pop()).To(BeNil())
Expect(queue.Pop(1000)).To(Equal(frame1))
Expect(queue.Pop(1000)).To(BeNil())
})
It("returns normal frames if no prio frames are available", func() {
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.Pop()).To(Equal(frame1))
Expect(queue.Pop()).To(Equal(frame2))
Expect(queue.Pop(1000)).To(Equal(frame1))
Expect(queue.Pop(1000)).To(Equal(frame2))
})
It("returns prio frames first", func() {
@@ -127,22 +124,96 @@ var _ = Describe("streamFrameQueue", func() {
queue.Push(frame1, false)
queue.Push(frame2, false)
queue.Push(prioFrame2, true)
Expect(queue.Pop()).To(Equal(prioFrame1))
Expect(queue.Pop()).To(Equal(prioFrame2))
Expect(queue.Pop()).To(Equal(frame1))
frame := queue.Pop(1000)
Expect(frame).To(Equal(prioFrame1))
frame = queue.Pop(1000)
Expect(frame).To(Equal(prioFrame2))
frame = queue.Pop(1000)
Expect(frame).To(Equal(frame1))
})
Context("splitting of frames", func() {
It("splits off nothing", func() {
f := &frames.StreamFrame{
StreamID: 1,
Data: []byte("bar"),
Offset: 3,
}
Expect(queue.maybeSplitOffFrame(f, 1000)).To(BeNil())
Expect(f.Offset).To(Equal(protocol.ByteCount(3)))
})
It("splits off initial frame", func() {
f := &frames.StreamFrame{
StreamID: 1,
Data: []byte("foobar"),
Offset: 3,
FinBit: true,
}
minLength, _ := f.MinLength()
previous := queue.maybeSplitOffFrame(f, minLength-1+3)
Expect(previous).ToNot(BeNil())
Expect(previous.StreamID).To(Equal(protocol.StreamID(1)))
Expect(previous.Data).To(Equal([]byte("foo")))
Expect(previous.Offset).To(Equal(protocol.ByteCount(3)))
Expect(previous.FinBit).To(BeFalse())
Expect(f.StreamID).To(Equal(protocol.StreamID(1)))
Expect(f.Data).To(Equal([]byte("bar")))
Expect(f.Offset).To(Equal(protocol.ByteCount(6)))
Expect(f.FinBit).To(BeTrue())
})
It("splits a frame", func() {
queue.Push(frame1, false)
origlen := len(frame1.Data)
frame := queue.Pop(6)
minLength, _ := frame.MinLength()
Expect(int(minLength) - 1 + len(frame.Data)).To(Equal(6))
Expect(queue.frames[0].Data).To(HaveLen(origlen - len(frame.Data)))
Expect(queue.frames[0].Offset).To(Equal(protocol.ByteCount(len(frame.Data))))
})
It("only removes a frame from the queue after return all split parts", func() {
queue.Push(frame1, false)
Expect(queue.Len()).To(Equal(1))
frame := queue.Pop(6)
Expect(frame).ToNot(BeNil())
Expect(queue.Len()).To(Equal(1))
frame = queue.Pop(100)
Expect(frame).ToNot(BeNil())
Expect(queue.Len()).To(BeZero())
})
It("gets the whole data of a frame, when it was split", func() {
length := len(frame1.Data)
origdata := make([]byte, length)
copy(origdata, frame1.Data)
queue.Push(frame1, false)
frame := queue.Pop(6)
nextframe := queue.Pop(1000)
Expect(len(frame.Data) + len(nextframe.Data)).To(Equal(length))
data := make([]byte, length)
copy(data, frame.Data)
copy(data[len(frame.Data):], nextframe.Data)
Expect(data).To(Equal(origdata))
})
})
})
Context("Front", func() {
It("returns nil for an empty queue", func() {
Expect(queue.Front()).To(BeNil())
Expect(queue.front()).To(BeNil())
})
It("returns normal frames if no prio frames are available", func() {
queue.Push(frame1, false)
queue.Push(frame2, false)
Expect(queue.Front()).To(Equal(frame1))
Expect(queue.Front()).To(Equal(frame1))
frame, isPrioFrame := queue.front()
Expect(isPrioFrame).To(BeFalse())
Expect(frame).To(Equal(frame1))
frame, isPrioFrame = queue.front()
Expect(isPrioFrame).To(BeFalse())
Expect(frame).To(Equal(frame1))
})
It("returns prio frames first", func() {
@@ -150,7 +221,9 @@ var _ = Describe("streamFrameQueue", func() {
queue.Push(frame1, false)
queue.Push(frame2, false)
queue.Push(prioFrame2, true)
Expect(queue.Front()).To(Equal(prioFrame1))
frame, isPrioFrame := queue.front()
Expect(isPrioFrame).To(BeTrue())
Expect(frame).To(Equal(prioFrame1))
})
})
})